diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index 7e2903a7..8cc2a42c 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -142,7 +142,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) { prepareTask(task, request.getInstance(), executorType); scanDataset(taskId, request.getSrcDatasetId()); - taskScheduler.executeTask(taskId); + return task; } @@ -157,9 +157,12 @@ public List getTaskResults(String taskId) { return cleaningResultRepo.findByInstanceId(taskId); } - public List getTaskLog(String taskId) { + public List getTaskLog(String taskId, int retryCount) { cleanTaskValidator.checkTaskId(taskId); String logPath = FLOW_PATH + "/" + taskId + "/output.log"; + if (retryCount > 0) { + logPath += "." + retryCount; + } try (Stream lines = Files.lines(Paths.get(logPath))) { List logs = new ArrayList<>(); AtomicReference lastLevel = new AtomicReference<>("INFO"); @@ -215,7 +218,7 @@ public void executeTask(String taskId) { CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId); scanDataset(taskId, task.getSrcDatasetId(), succeedSet); cleaningResultRepo.deleteByInstanceId(taskId, "FAILED"); - taskScheduler.executeTask(taskId); + taskScheduler.executeTask(taskId, task.getRetryCount() + 1); } private void prepareTask(CleaningTaskDto task, List instances, ExecutorType executorType) { diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java index 6692ffac..92df8457 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java @@ -20,15 +20,16 @@ public class CleaningTaskScheduler { private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5); - public void executeTask(String taskId) { - taskExecutor.submit(() -> submitTask(taskId)); + public void executeTask(String taskId, int retryCount) { + taskExecutor.submit(() -> submitTask(taskId, retryCount)); } - private void submitTask(String taskId) { + private void submitTask(String taskId, int retryCount) { CleaningTaskDto task = new CleaningTaskDto(); task.setId(taskId); task.setStatus(CleaningTaskStatusEnum.RUNNING); task.setStartedAt(LocalDateTime.now()); + task.setRetryCount(retryCount); cleaningTaskRepo.updateTask(task); runtimeClient.submitTask(taskId); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java index 298e83c8..a612cbe8 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java @@ -36,6 +36,8 @@ public class CleaningTask extends BaseEntity { private Integer fileCount; + private Integer retryCount; + private LocalDateTime startedAt; private LocalDateTime finishedAt; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java index f2cc9651..bc1274f1 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java @@ -38,6 +38,8 @@ public class CleaningTaskDto { private Integer fileCount; + private Integer retryCount; + private CleaningTaskStatusEnum status; private String templateId; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java index d1f25080..fc7d1b51 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java @@ -1,6 +1,7 @@ package com.datamate.cleaning.interfaces.rest; import com.datamate.cleaning.application.CleaningTaskService; +import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler; import com.datamate.cleaning.interfaces.dto.*; import com.datamate.common.interfaces.PagedResponse; import lombok.RequiredArgsConstructor; @@ -18,6 +19,8 @@ public class CleaningTaskController { private final CleaningTaskService cleaningTaskService; + private final CleaningTaskScheduler taskScheduler; + @GetMapping public PagedResponse cleaningTasksGet( @RequestParam("page") Integer page, @@ -36,7 +39,9 @@ public CleaningTaskDto cleaningTasksPost(@McpToolParam(description = "创建任 if (request.getInstance().isEmpty() && StringUtils.isNotBlank(request.getTemplateId())) { request.setInstance(cleaningTaskService.getInstanceByTemplateId(request.getTemplateId())); } - return cleaningTaskService.createTask(request); + CleaningTaskDto task = cleaningTaskService.createTask(request); + taskScheduler.executeTask(task.getId(), 0); + return task; } @PostMapping("/{taskId}/stop") @@ -74,8 +79,9 @@ public List cleaningTasksTaskIdGetResult(@PathVariable("taskI return cleaningTaskService.getTaskResults(taskId); } - @GetMapping("/{taskId}/log") - public List cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId) { - return cleaningTaskService.getTaskLog(taskId); + @GetMapping("/{taskId}/log/{retryCount}") + public List cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId, + @PathVariable("retryCount") int retryCount) { + return cleaningTaskService.getTaskLog(taskId, retryCount); } } diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index 745b701e..7508aef1 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -76,10 +76,10 @@ export default function CleansingTaskDetail() { const [taskLog, setTaskLog] = useState(); - const fetchTaskLog = async () => { + const fetchTaskLog = async (retryCount: number) => { if (!id) return; try { - const { data } = await queryCleaningTaskLogByIdUsingGet(id); + const { data } = await queryCleaningTaskLogByIdUsingGet(id, retryCount); setTaskLog(data); } catch (error) { message.error("获取清洗日志失败"); @@ -90,7 +90,7 @@ export default function CleansingTaskDetail() { const handleRefresh = async () => { fetchTaskDetail(); {activeTab === "files" && await fetchTaskResult()} - {activeTab === "logs" && await fetchTaskLog()} + {activeTab === "logs" && await fetchTaskLog(task.retryCount)} }; useEffect(() => { @@ -190,7 +190,7 @@ export default function CleansingTaskDetail() { const breadItems = [ { - title: 数据清洗, + title: 数据处理, }, { title: "清洗任务详情", @@ -215,7 +215,7 @@ export default function CleansingTaskDetail() { )} {activeTab === "operators" && } {activeTab === "files" && } - {activeTab === "logs" && } + {activeTab === "logs" && } diff --git a/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx index 30c07dd4..e3b7d8e8 100644 --- a/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx @@ -94,7 +94,7 @@ export default function CleansingTemplateDetail() { const breadItems = [ { - title: 数据清洗, + title: 数据处理, }, { title: "模板详情", diff --git a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx index f1ff44fc..464897a2 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx @@ -56,6 +56,8 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { ), span: 2, }, + { key: "finishedTime", label: "结束时间", children: task?.finishedAt }, + { key: "name", label: "重试次数", children: task?.retryCount }, ]; return ( diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index 4e9376af..3ca9f448 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,16 +1,35 @@ -import {useEffect} from "react"; +import {useEffect, useState} from "react"; import {useParams} from "react-router"; import {FileClock} from "lucide-react"; -export default function LogsTable({taskLog, fetchTaskLog} : {taskLog: any[], fetchTaskLog: () => Promise}) { +export default function LogsTable({taskLog, fetchTaskLog, retryCount} : {taskLog: any[], fetchTaskLog: () => Promise, retryCount: number}) { const { id = "" } = useParams(); + const [selectedLog, setSelectedLog] = useState(retryCount + 1); useEffect(() => { - fetchTaskLog(); - }, [id]); + fetchTaskLog(selectedLog - 1); + }, [id, selectedLog]); return taskLog?.length > 0 ? ( <> + {/* --- 新增区域:左上角 Select 组件 --- */} +
+
+ + +
+ 当前展示: 第 {selectedLog} 次 +
{taskLog?.map?.((log, index) => ( diff --git a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx index af28c538..1a21de99 100644 --- a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx +++ b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx @@ -22,7 +22,7 @@ export default function DataProcessingPage() {
{/* Header */}
-

数据清洗

+

数据处理

diff --git a/frontend/src/pages/Layout/Menu.tsx b/frontend/src/pages/Layout/Menu.tsx index f38297fb..8c67a79c 100644 --- a/frontend/src/pages/Layout/Menu.tsx +++ b/frontend/src/pages/Layout/Menu.tsx @@ -31,7 +31,7 @@ export const menuItems = [ }, { id: "cleansing", - title: "数据清洗", + title: "数据处理", icon: GitBranch, description: "数据清洗和预处理", color: "bg-purple-500", diff --git a/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx b/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx index b9a55ff0..03f42548 100644 --- a/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx +++ b/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx @@ -142,7 +142,7 @@ export default function OperatorPluginDetail() { }, { key: "requirement", - label: "环境依赖", + label: "系统规格", }, { key: "documentation", diff --git a/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx b/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx index 9c23b4fa..03d41621 100644 --- a/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx +++ b/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx @@ -21,13 +21,13 @@ export default function Requirement({ operator }) {

系统要求

- CPU要求 + CPU规格 {requirement?.cpu || '无限制'}
- 内存要求 + 内存规格 {requirement?.memory || "无限制"} diff --git a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py index 011f05ab..87701d1b 100644 --- a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py @@ -1,4 +1,5 @@ import asyncio +import os from datetime import datetime from typing import Optional, List @@ -13,6 +14,7 @@ class CommandTask(Task): def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True, timeout: Optional[int] = None, *args, **kwargs): super().__init__(task_id, *args, **kwargs) + self.max_backups = 9 self.log_path = log_path self.command = command self.shell = shell @@ -34,7 +36,14 @@ async def _execute(self): self.status = TaskStatus.RUNNING self.started_at = datetime.now() - with open(self.log_path, 'a') as f: + current_log_path = self.log_path + if os.path.exists(current_log_path): + counter = 1 + while os.path.exists(f"{self.log_path}.{counter}"): + counter += 1 + current_log_path = f"{self.log_path}.{counter}" + + with open(current_log_path, 'a') as f: # 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec if self.shell: process = await asyncio.create_subprocess_shell( diff --git a/scripts/db/data-cleaning-init.sql b/scripts/db/data-cleaning-init.sql index 8181c4ec..93322f44 100644 --- a/scripts/db/data-cleaning-init.sql +++ b/scripts/db/data-cleaning-init.sql @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS t_clean_task before_size BIGINT, after_size BIGINT, file_count INTEGER, + retry_count INTEGER, started_at TIMESTAMP, finished_at TIMESTAMP, created_by VARCHAR(256), @@ -54,6 +55,7 @@ COMMENT ON COLUMN t_clean_task.dest_dataset_name IS '目标数据集名称'; COMMENT ON COLUMN t_clean_task.before_size IS '清洗前大小'; COMMENT ON COLUMN t_clean_task.after_size IS '清洗后大小'; COMMENT ON COLUMN t_clean_task.file_count IS '文件数量'; +COMMENT ON COLUMN t_clean_task.retry_count IS '重试次数'; COMMENT ON COLUMN t_clean_task.started_at IS '开始时间'; COMMENT ON COLUMN t_clean_task.finished_at IS '完成时间'; COMMENT ON COLUMN t_clean_task.created_at IS '创建时间';