Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: file-worker任务状态更新请求无序到达导致第三方源文件偶现分发失败 #2434 #2443

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,13 @@

import com.tencent.bk.job.file_gateway.model.dto.FileSourceBatchTaskDTO;

import java.util.List;

public interface FileSourceBatchTaskDAO {
String insertFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO);

int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskDTO);

int updateFileClearStatus(List<String> taskIdList, boolean fileCleared);

int deleteFileSourceBatchTaskById(String id);

FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id);
FileSourceBatchTaskDTO getBatchTaskById(String id);

Long countFileSourceBatchTasks(Long appId);
FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id);

List<FileSourceBatchTaskDTO> listFileSourceBatchTasks(Long appId, Integer start,
Integer pageSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.tencent.bk.job.file_gateway.model.dto.FileSourceTaskDTO;

import java.util.Collection;
import java.util.List;

public interface FileSourceTaskDAO {
Expand All @@ -40,16 +39,9 @@ public interface FileSourceTaskDAO {

FileSourceTaskDTO getFileSourceTaskById(String id);

Long countFileSourceTasks(Long appId);
FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id);

Long countFileSourceTasksByBatchTaskId(String batchTaskId, Byte status);

List<FileSourceTaskDTO> listFileSourceTasks(Long appId, Integer start, Integer pageSize);

List<FileSourceTaskDTO> listTimeoutTasks(Long expireTimeMills, Collection<Byte> statusSet,
Integer start, Integer pageSize);

List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId);

int deleteByBatchTaskId(String batchTaskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,14 @@ public interface FileTaskDAO {

int resetFileTasks(String fileSourceTaskId);

int deleteFileTaskById(Long id);

int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId);

FileTaskDTO getFileTaskById(Long id);

FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath);

Long countFileTasks(String fileSourceTaskId);
FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId, Integer start, Integer pageSize);

List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet
, Integer start, Integer pageSize);
List<String> listTimeoutFileSourceTaskIds(Long expireTimeMills, Collection<Byte> statusSet, Integer start,
Integer pageSize);

List<FileTaskDTO> listFileTasks(String fileSourceTaskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,21 @@
import com.tencent.bk.job.file_gateway.model.tables.records.FileSourceBatchTaskRecord;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

@Slf4j
@Repository
public class FileSourceBatchTaskDAOImpl extends BaseDAOImpl implements FileSourceBatchTaskDAO {

private static final FileSourceBatchTask defaultTable = FileSourceBatchTask.FILE_SOURCE_BATCH_TASK;
private final FileSourceTaskDAO fileSourceTaskDAO;

private final DSLContext dslContext;
private final FileSourceTaskDAO fileSourceTaskDAO;

@Autowired
public FileSourceBatchTaskDAOImpl(@Qualifier("job-file-gateway-dsl-context") DSLContext dslContext,
Expand Down Expand Up @@ -135,29 +130,7 @@ public int updateFileSourceBatchTask(FileSourceBatchTaskDTO fileSourceBatchTaskD
}

@Override
public int updateFileClearStatus(List<String> taskIdList, boolean fileCleared) {
val query = dslContext.update(defaultTable)
.set(defaultTable.FILE_CLEARED, fileCleared)
.set(defaultTable.LAST_MODIFY_TIME, System.currentTimeMillis())
.where(defaultTable.ID.in(taskIdList));
val sql = query.getSQL(ParamType.INLINED);
try {
return query.execute();
} catch (Exception e) {
log.error(sql);
throw e;
}
}

@Override
public int deleteFileSourceBatchTaskById(String id) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.ID.eq(id)
).execute();
}

@Override
public FileSourceBatchTaskDTO getFileSourceBatchTaskById(String id) {
public FileSourceBatchTaskDTO getBatchTaskById(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).fetchOne();
Expand All @@ -169,34 +142,15 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceBatchTasks(Long appId) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
}
return countFileSourceBatchTasksByConditions(conditions);
}

public Long countFileSourceBatchTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
).from(defaultTable)
.where(conditions);
return query.fetchOne(0, Long.class);
}

@Override
public List<FileSourceBatchTaskDTO> listFileSourceBatchTasks(Long appId,
Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
public FileSourceBatchTaskDTO getBatchTaskByIdForUpdate(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
val query = dslContext.selectFrom(defaultTable)
.where(conditions)
.orderBy(defaultTable.LAST_MODIFY_TIME.desc());
return listPage(query, start, pageSize, this::convertRecordToDto);
}

private FileSourceBatchTaskDTO convertRecordToDto(FileSourceBatchTaskRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public String insertFileSourceTask(FileSourceTaskDTO fileSourceTaskDTO) {
for (FileTaskDTO fileTaskDTO : fileTaskDTOList) {
fileTaskDTO.setFileSourceTaskId(id);
// 插入FileTask
fileTaskDAO.insertFileTask(fileTaskDTO);
Long fileTaskId = fileTaskDAO.insertFileTask(fileTaskDTO);
log.debug("{} inserted, id={}", fileTaskDTO, fileTaskId);
}
fileSourceTaskDTO.setFileTaskList(fileTaskDTOList);
return id;
Expand Down Expand Up @@ -177,12 +178,15 @@ val record = dslContext.selectFrom(defaultTable).where(
}

@Override
public Long countFileSourceTasks(Long appId) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
public FileSourceTaskDTO getFileSourceTaskByIdForUpdate(String id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
return countFileSourceTasksByConditions(conditions);
}

@Override
Expand Down Expand Up @@ -213,29 +217,6 @@ public List<FileSourceTaskDTO> listByConditions(Collection<Condition> conditions
return listPage(query, start, pageSize, this::convertRecordToDto);
}

@Override
public List<FileSourceTaskDTO> listFileSourceTasks(Long appId, Integer start,
Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (appId != null) {
conditions.add(defaultTable.APP_ID.eq(appId));
}
return listByConditions(conditions, start, pageSize);
}

@Override
public List<FileSourceTaskDTO> listTimeoutTasks(Long expireTimeMills,
Collection<Byte> statusSet, Integer start, Integer pageSize) {
List<Condition> conditions = new ArrayList<>();
if (expireTimeMills != null && expireTimeMills > 0) {
conditions.add(defaultTable.LAST_MODIFY_TIME.le(System.currentTimeMillis() - expireTimeMills));
}
if (statusSet != null && !statusSet.isEmpty()) {
conditions.add(defaultTable.STATUS.in(statusSet));
}
return listByConditions(conditions, start, pageSize);
}

@Override
public List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId) {
List<Condition> conditions = new ArrayList<>();
Expand All @@ -245,13 +226,6 @@ public List<FileSourceTaskDTO> listByBatchTaskId(String batchTaskId) {
return listByConditions(conditions, null, null);
}

@Override
public int deleteByBatchTaskId(String batchTaskId) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.BATCH_TASK_ID.eq(batchTaskId)
).execute();
}

private FileSourceTaskDTO convertRecordToDto(FileSourceTaskRecord record) {
String id = record.getId();
List<FileTaskDTO> fileTaskDTOList = fileTaskDAO.listFileTasks(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,6 @@ public int resetFileTasks(String fileSourceTaskId) {
}
}


@Override
public int deleteFileTaskById(Long id) {
return dslContext.deleteFrom(defaultTable).where(
defaultTable.ID.eq(id)
).execute();
}

@Override
public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
return dslContext.deleteFrom(defaultTable).where(
Expand All @@ -143,45 +135,20 @@ public int deleteFileTaskByFileSourceTaskId(String fileSourceTaskId) {
}

@Override
public FileTaskDTO getFileTaskById(Long id) {
val record = dslContext.selectFrom(defaultTable).where(
defaultTable.ID.eq(id)
).fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public FileTaskDTO getOneFileTask(String fileSourceTaskId, String filePath) {
public FileTaskDTO getOneFileTaskForUpdate(String fileSourceTaskId, String filePath) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
if (filePath != null) {
conditions.add(defaultTable.FILE_PATH.eq(filePath));
}
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
conditions.add(defaultTable.FILE_PATH.eq(filePath));
val record = dslContext.selectFrom(defaultTable).where(
conditions
).fetchOne();
).forUpdate().fetchOne();
if (record == null) {
return null;
} else {
return convertRecordToDto(record);
}
}

@Override
public Long countFileTasks(String fileSourceTaskId) {
List<Condition> conditions = new ArrayList<>();
if (fileSourceTaskId != null) {
conditions.add(defaultTable.FILE_SOURCE_TASK_ID.eq(fileSourceTaskId));
}
return countFileTasksByConditions(conditions);
}

public Long countFileTasksByConditions(Collection<Condition> conditions) {
val query = dslContext.select(
DSL.countDistinct(defaultTable.ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ public Integer stopBatchTasks(List<String> batchTaskIdList) {
public BatchTaskStatusDTO getBatchTaskStatusAndLogs(String batchTaskId, Long logStart, Long logLength) {
BatchTaskStatusDTO batchTaskStatusDTO = new BatchTaskStatusDTO();
batchTaskStatusDTO.setBatchTaskId(batchTaskId);
FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getFileSourceBatchTaskById(
batchTaskId);
FileSourceBatchTaskDTO fileSourceBatchTaskDTO = fileSourceBatchTaskDAO.getBatchTaskById(batchTaskId);
if (fileSourceBatchTaskDTO == null) {
throw new InternalException(ErrorCode.INTERNAL_ERROR);
}
Expand Down
Loading
Loading