From f31fc0d25393702589a9040cc97bc5580e5f7d9e Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Thu, 22 Jan 2026 20:34:09 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20=E6=B8=85=E6=B4=97=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=A2=9E=E5=8A=A0=E9=87=8D=E8=AF=95=E6=AC=A1=E6=95=B0?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=92=8C=E6=97=A5=E5=BF=97=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/CleaningTaskService.java | 9 ++++--- .../scheduler/CleaningTaskScheduler.java | 7 ++--- .../domain/model/entity/CleaningTask.java | 2 ++ .../interfaces/dto/CleaningTaskDto.java | 2 ++ .../rest/CleaningTaskController.java | 14 +++++++--- .../pages/DataCleansing/Detail/TaskDetail.tsx | 8 +++--- .../Detail/components/BasicInfo.tsx | 2 ++ .../Detail/components/LogsTable.tsx | 27 ++++++++++++++++--- .../src/pages/DataCleansing/cleansing.api.ts | 4 +-- .../pages/DataCleansing/cleansing.model.ts | 1 + .../Detail/OperatorPluginDetail.tsx | 2 +- .../Detail/components/Requirement.tsx | 4 +-- .../datamate/scheduler/cmd_task_scheduler.py | 11 +++++++- scripts/db/data-cleaning-init.sql | 2 ++ scripts/images/backend/Dockerfile | 3 ++- scripts/images/runtime/Dockerfile | 1 + 16 files changed, 74 insertions(+), 25 deletions(-) diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java index 7e2903a70..8cc2a42cf 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/CleaningTaskService.java @@ -142,7 +142,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) { prepareTask(task, request.getInstance(), executorType); scanDataset(taskId, request.getSrcDatasetId()); - taskScheduler.executeTask(taskId); + return task; } @@ -157,9 +157,12 @@ public List getTaskResults(String taskId) { return cleaningResultRepo.findByInstanceId(taskId); } - public List getTaskLog(String taskId) { + public List getTaskLog(String taskId, int retryCount) { cleanTaskValidator.checkTaskId(taskId); String logPath = FLOW_PATH + "/" + taskId + "/output.log"; + if (retryCount > 0) { + logPath += "." + retryCount; + } try (Stream lines = Files.lines(Paths.get(logPath))) { List logs = new ArrayList<>(); AtomicReference lastLevel = new AtomicReference<>("INFO"); @@ -215,7 +218,7 @@ public void executeTask(String taskId) { CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId); scanDataset(taskId, task.getSrcDatasetId(), succeedSet); cleaningResultRepo.deleteByInstanceId(taskId, "FAILED"); - taskScheduler.executeTask(taskId); + taskScheduler.executeTask(taskId, task.getRetryCount() + 1); } private void prepareTask(CleaningTaskDto task, List instances, ExecutorType executorType) { diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java index 6692ffac2..92df84571 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/application/scheduler/CleaningTaskScheduler.java @@ -20,15 +20,16 @@ public class CleaningTaskScheduler { private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5); - public void executeTask(String taskId) { - taskExecutor.submit(() -> submitTask(taskId)); + public void executeTask(String taskId, int retryCount) { + taskExecutor.submit(() -> submitTask(taskId, retryCount)); } - private void submitTask(String taskId) { + private void submitTask(String taskId, int retryCount) { CleaningTaskDto task = new CleaningTaskDto(); task.setId(taskId); task.setStatus(CleaningTaskStatusEnum.RUNNING); task.setStartedAt(LocalDateTime.now()); + task.setRetryCount(retryCount); cleaningTaskRepo.updateTask(task); runtimeClient.submitTask(taskId); } diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java index f1f75a9ef..1f2705587 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/domain/model/entity/CleaningTask.java @@ -37,6 +37,8 @@ public class CleaningTask { private Integer fileCount; + private Integer retryCount; + private LocalDateTime createdAt; private LocalDateTime startedAt; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java index f2cc96516..bc1274f17 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/dto/CleaningTaskDto.java @@ -38,6 +38,8 @@ public class CleaningTaskDto { private Integer fileCount; + private Integer retryCount; + private CleaningTaskStatusEnum status; private String templateId; diff --git a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java index d1f250809..fc7d1b513 100644 --- a/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java +++ b/backend/services/data-cleaning-service/src/main/java/com/datamate/cleaning/interfaces/rest/CleaningTaskController.java @@ -1,6 +1,7 @@ package com.datamate.cleaning.interfaces.rest; import com.datamate.cleaning.application.CleaningTaskService; +import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler; import com.datamate.cleaning.interfaces.dto.*; import com.datamate.common.interfaces.PagedResponse; import lombok.RequiredArgsConstructor; @@ -18,6 +19,8 @@ public class CleaningTaskController { private final CleaningTaskService cleaningTaskService; + private final CleaningTaskScheduler taskScheduler; + @GetMapping public PagedResponse cleaningTasksGet( @RequestParam("page") Integer page, @@ -36,7 +39,9 @@ public CleaningTaskDto cleaningTasksPost(@McpToolParam(description = "创建任 if (request.getInstance().isEmpty() && StringUtils.isNotBlank(request.getTemplateId())) { request.setInstance(cleaningTaskService.getInstanceByTemplateId(request.getTemplateId())); } - return cleaningTaskService.createTask(request); + CleaningTaskDto task = cleaningTaskService.createTask(request); + taskScheduler.executeTask(task.getId(), 0); + return task; } @PostMapping("/{taskId}/stop") @@ -74,8 +79,9 @@ public List cleaningTasksTaskIdGetResult(@PathVariable("taskI return cleaningTaskService.getTaskResults(taskId); } - @GetMapping("/{taskId}/log") - public List cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId) { - return cleaningTaskService.getTaskLog(taskId); + @GetMapping("/{taskId}/log/{retryCount}") + public List cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId, + @PathVariable("retryCount") int retryCount) { + return cleaningTaskService.getTaskLog(taskId, retryCount); } } diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index 745b701ef..d22276815 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -76,10 +76,10 @@ export default function CleansingTaskDetail() { const [taskLog, setTaskLog] = useState(); - const fetchTaskLog = async () => { + const fetchTaskLog = async (retryCount: number) => { if (!id) return; try { - const { data } = await queryCleaningTaskLogByIdUsingGet(id); + const { data } = await queryCleaningTaskLogByIdUsingGet(id, retryCount); setTaskLog(data); } catch (error) { message.error("获取清洗日志失败"); @@ -90,7 +90,7 @@ export default function CleansingTaskDetail() { const handleRefresh = async () => { fetchTaskDetail(); {activeTab === "files" && await fetchTaskResult()} - {activeTab === "logs" && await fetchTaskLog()} + {activeTab === "logs" && await fetchTaskLog(task.retryCount)} }; useEffect(() => { @@ -215,7 +215,7 @@ export default function CleansingTaskDetail() { )} {activeTab === "operators" && } {activeTab === "files" && } - {activeTab === "logs" && } + {activeTab === "logs" && } diff --git a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx index f1ff44fc9..464897a27 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/BasicInfo.tsx @@ -56,6 +56,8 @@ export default function BasicInfo({ task }: { task: CleansingTask }) { ), span: 2, }, + { key: "finishedTime", label: "结束时间", children: task?.finishedAt }, + { key: "name", label: "重试次数", children: task?.retryCount }, ]; return ( diff --git a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx index 4e9376af6..3ca9f4482 100644 --- a/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx +++ b/frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx @@ -1,16 +1,35 @@ -import {useEffect} from "react"; +import {useEffect, useState} from "react"; import {useParams} from "react-router"; import {FileClock} from "lucide-react"; -export default function LogsTable({taskLog, fetchTaskLog} : {taskLog: any[], fetchTaskLog: () => Promise}) { +export default function LogsTable({taskLog, fetchTaskLog, retryCount} : {taskLog: any[], fetchTaskLog: () => Promise, retryCount: number}) { const { id = "" } = useParams(); + const [selectedLog, setSelectedLog] = useState(retryCount + 1); useEffect(() => { - fetchTaskLog(); - }, [id]); + fetchTaskLog(selectedLog - 1); + }, [id, selectedLog]); return taskLog?.length > 0 ? ( <> + {/* --- 新增区域:左上角 Select 组件 --- */} +
+
+ + +
+ 当前展示: 第 {selectedLog} 次 +
{taskLog?.map?.((log, index) => ( diff --git a/frontend/src/pages/DataCleansing/cleansing.api.ts b/frontend/src/pages/DataCleansing/cleansing.api.ts index d96bed0a0..edf1d8e8b 100644 --- a/frontend/src/pages/DataCleansing/cleansing.api.ts +++ b/frontend/src/pages/DataCleansing/cleansing.api.ts @@ -17,8 +17,8 @@ export function queryCleaningTaskResultByIdUsingGet(taskId: string | number) { return get(`/api/cleaning/tasks/${taskId}/result`); } -export function queryCleaningTaskLogByIdUsingGet(taskId: string | number) { - return get(`/api/cleaning/tasks/${taskId}/log`); +export function queryCleaningTaskLogByIdUsingGet(taskId: string | number, retryCount: number) { + return get(`/api/cleaning/tasks/${taskId}/log/${retryCount}`); } export function updateCleaningTaskByIdUsingPut(taskId: string | number, data: any) { diff --git a/frontend/src/pages/DataCleansing/cleansing.model.ts b/frontend/src/pages/DataCleansing/cleansing.model.ts index a69525255..5a109b6ca 100644 --- a/frontend/src/pages/DataCleansing/cleansing.model.ts +++ b/frontend/src/pages/DataCleansing/cleansing.model.ts @@ -30,6 +30,7 @@ export interface CleansingTask { finishedAt: string; beforeSize?: number; afterSize?: number; + retryCount: number; } export interface CleansingTemplate { diff --git a/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx b/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx index b9a55ff06..03f425489 100644 --- a/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx +++ b/frontend/src/pages/OperatorMarket/Detail/OperatorPluginDetail.tsx @@ -142,7 +142,7 @@ export default function OperatorPluginDetail() { }, { key: "requirement", - label: "环境依赖", + label: "系统规格", }, { key: "documentation", diff --git a/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx b/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx index 9c23b4fa6..03d416218 100644 --- a/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx +++ b/frontend/src/pages/OperatorMarket/Detail/components/Requirement.tsx @@ -21,13 +21,13 @@ export default function Requirement({ operator }) {

系统要求

- CPU要求 + CPU规格 {requirement?.cpu || '无限制'}
- 内存要求 + 内存规格 {requirement?.memory || "无限制"} diff --git a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py index 011f05aba..87701d1b7 100644 --- a/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py +++ b/runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py @@ -1,4 +1,5 @@ import asyncio +import os from datetime import datetime from typing import Optional, List @@ -13,6 +14,7 @@ class CommandTask(Task): def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True, timeout: Optional[int] = None, *args, **kwargs): super().__init__(task_id, *args, **kwargs) + self.max_backups = 9 self.log_path = log_path self.command = command self.shell = shell @@ -34,7 +36,14 @@ async def _execute(self): self.status = TaskStatus.RUNNING self.started_at = datetime.now() - with open(self.log_path, 'a') as f: + current_log_path = self.log_path + if os.path.exists(current_log_path): + counter = 1 + while os.path.exists(f"{self.log_path}.{counter}"): + counter += 1 + current_log_path = f"{self.log_path}.{counter}" + + with open(current_log_path, 'a') as f: # 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec if self.shell: process = await asyncio.create_subprocess_shell( diff --git a/scripts/db/data-cleaning-init.sql b/scripts/db/data-cleaning-init.sql index c8ffd8d3b..6c7b7b66f 100644 --- a/scripts/db/data-cleaning-init.sql +++ b/scripts/db/data-cleaning-init.sql @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS t_clean_task before_size BIGINT, after_size BIGINT, file_count INTEGER, + retry_count INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, started_at TIMESTAMP, finished_at TIMESTAMP, @@ -52,6 +53,7 @@ COMMENT ON COLUMN t_clean_task.dest_dataset_name IS '目标数据集名称'; COMMENT ON COLUMN t_clean_task.before_size IS '清洗前大小'; COMMENT ON COLUMN t_clean_task.after_size IS '清洗后大小'; COMMENT ON COLUMN t_clean_task.file_count IS '文件数量'; +COMMENT ON COLUMN t_clean_task.retry_count IS '重试次数'; COMMENT ON COLUMN t_clean_task.created_at IS '创建时间'; COMMENT ON COLUMN t_clean_task.started_at IS '开始时间'; COMMENT ON COLUMN t_clean_task.finished_at IS '完成时间'; diff --git a/scripts/images/backend/Dockerfile b/scripts/images/backend/Dockerfile index f75e9200b..3e9de71b5 100644 --- a/scripts/images/backend/Dockerfile +++ b/scripts/images/backend/Dockerfile @@ -7,7 +7,8 @@ RUN cd /opt/backend/ && \ FROM eclipse-temurin:21-jdk -RUN apt-get update && \ +RUN sed -i "s@http://.*.ubuntu.com@http://mirrors.huaweicloud.com@g" /etc/apt/sources.list.d/ubuntu.sources && \ + apt-get update && \ apt-get install -y vim wget curl rsync python3 python3-pip python-is-python3 dos2unix && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index 5481ec391..45856e96c 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -2,6 +2,7 @@ FROM ghcr.io/astral-sh/uv:python3.11-bookworm RUN --mount=type=cache,target=/var/cache/apt \ --mount=type=cache,target=/var/lib/apt \ + sed -i 's/deb.debian.org/mirrors.huaweicloud.com/g' /etc/apt/sources.list.d/debian.sources && \ apt update \ && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix swig poppler-utils tesseract-ocr From 52d2f3e695df07eb386b033ee02e4f8f3654f4be Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 23 Jan 2026 10:12:48 +0800 Subject: [PATCH 2/2] =?UTF-8?q?feat:=20=E6=95=B0=E6=8D=AE=E6=B8=85?= =?UTF-8?q?=E6=B4=97=E6=94=B9=E4=B8=BA=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx | 2 +- frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx | 2 +- frontend/src/pages/DataCleansing/Home/DataCleansing.tsx | 2 +- frontend/src/pages/Home/Home.tsx | 2 +- frontend/src/pages/Layout/Menu.tsx | 2 +- scripts/images/backend/Dockerfile | 3 +-- scripts/images/runtime/Dockerfile | 1 - 7 files changed, 6 insertions(+), 8 deletions(-) diff --git a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx index d22276815..7508aef1d 100644 --- a/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx @@ -190,7 +190,7 @@ export default function CleansingTaskDetail() { const breadItems = [ { - title: 数据清洗, + title: 数据处理, }, { title: "清洗任务详情", diff --git a/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx index 30c07dd48..e3b7d8e8c 100644 --- a/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx +++ b/frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx @@ -94,7 +94,7 @@ export default function CleansingTemplateDetail() { const breadItems = [ { - title: 数据清洗, + title: 数据处理, }, { title: "模板详情", diff --git a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx index af28c5385..1a21de994 100644 --- a/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx +++ b/frontend/src/pages/DataCleansing/Home/DataCleansing.tsx @@ -22,7 +22,7 @@ export default function DataProcessingPage() {
{/* Header */}
-

数据清洗

+

数据处理

diff --git a/frontend/src/pages/Layout/Menu.tsx b/frontend/src/pages/Layout/Menu.tsx index f38297fb9..8c67a79c2 100644 --- a/frontend/src/pages/Layout/Menu.tsx +++ b/frontend/src/pages/Layout/Menu.tsx @@ -31,7 +31,7 @@ export const menuItems = [ }, { id: "cleansing", - title: "数据清洗", + title: "数据处理", icon: GitBranch, description: "数据清洗和预处理", color: "bg-purple-500", diff --git a/scripts/images/backend/Dockerfile b/scripts/images/backend/Dockerfile index 3e9de71b5..f75e9200b 100644 --- a/scripts/images/backend/Dockerfile +++ b/scripts/images/backend/Dockerfile @@ -7,8 +7,7 @@ RUN cd /opt/backend/ && \ FROM eclipse-temurin:21-jdk -RUN sed -i "s@http://.*.ubuntu.com@http://mirrors.huaweicloud.com@g" /etc/apt/sources.list.d/ubuntu.sources && \ - apt-get update && \ +RUN apt-get update && \ apt-get install -y vim wget curl rsync python3 python3-pip python-is-python3 dos2unix && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/scripts/images/runtime/Dockerfile b/scripts/images/runtime/Dockerfile index 45856e96c..5481ec391 100644 --- a/scripts/images/runtime/Dockerfile +++ b/scripts/images/runtime/Dockerfile @@ -2,7 +2,6 @@ FROM ghcr.io/astral-sh/uv:python3.11-bookworm RUN --mount=type=cache,target=/var/cache/apt \ --mount=type=cache,target=/var/lib/apt \ - sed -i 's/deb.debian.org/mirrors.huaweicloud.com/g' /etc/apt/sources.list.d/debian.sources && \ apt update \ && apt install -y libgl1 libglib2.0-0 vim libmagic1 libreoffice dos2unix swig poppler-utils tesseract-ocr