Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public CleaningTaskDto createTask(CreateCleaningTaskRequest request) {

prepareTask(task, request.getInstance(), executorType);
scanDataset(taskId, request.getSrcDatasetId());
taskScheduler.executeTask(taskId);

return task;
}

Expand All @@ -157,9 +157,12 @@ public List<CleaningResultDto> getTaskResults(String taskId) {
return cleaningResultRepo.findByInstanceId(taskId);
}

public List<CleaningTaskLog> getTaskLog(String taskId) {
public List<CleaningTaskLog> getTaskLog(String taskId, int retryCount) {
cleanTaskValidator.checkTaskId(taskId);
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
if (retryCount > 0) {
logPath += "." + retryCount;
}
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
List<CleaningTaskLog> logs = new ArrayList<>();
AtomicReference<String> lastLevel = new AtomicReference<>("INFO");
Expand Down Expand Up @@ -215,7 +218,7 @@ public void executeTask(String taskId) {
CleaningTaskDto task = cleaningTaskRepo.findTaskById(taskId);
scanDataset(taskId, task.getSrcDatasetId(), succeedSet);
cleaningResultRepo.deleteByInstanceId(taskId, "FAILED");
taskScheduler.executeTask(taskId);
taskScheduler.executeTask(taskId, task.getRetryCount() + 1);
}

private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances, ExecutorType executorType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ public class CleaningTaskScheduler {

private final ExecutorService taskExecutor = Executors.newFixedThreadPool(5);

public void executeTask(String taskId) {
taskExecutor.submit(() -> submitTask(taskId));
public void executeTask(String taskId, int retryCount) {
taskExecutor.submit(() -> submitTask(taskId, retryCount));
}

private void submitTask(String taskId) {
private void submitTask(String taskId, int retryCount) {
CleaningTaskDto task = new CleaningTaskDto();
task.setId(taskId);
task.setStatus(CleaningTaskStatusEnum.RUNNING);
task.setStartedAt(LocalDateTime.now());
task.setRetryCount(retryCount);
cleaningTaskRepo.updateTask(task);
runtimeClient.submitTask(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class CleaningTask extends BaseEntity<String> {

private Integer fileCount;

private Integer retryCount;

private LocalDateTime startedAt;

private LocalDateTime finishedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class CleaningTaskDto {

private Integer fileCount;

private Integer retryCount;

private CleaningTaskStatusEnum status;

private String templateId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datamate.cleaning.interfaces.rest;

import com.datamate.cleaning.application.CleaningTaskService;
import com.datamate.cleaning.application.scheduler.CleaningTaskScheduler;
import com.datamate.cleaning.interfaces.dto.*;
import com.datamate.common.interfaces.PagedResponse;
import lombok.RequiredArgsConstructor;
Expand All @@ -18,6 +19,8 @@
public class CleaningTaskController {
private final CleaningTaskService cleaningTaskService;

private final CleaningTaskScheduler taskScheduler;

@GetMapping
public PagedResponse<CleaningTaskDto> cleaningTasksGet(
@RequestParam("page") Integer page,
Expand All @@ -36,7 +39,9 @@ public CleaningTaskDto cleaningTasksPost(@McpToolParam(description = "创建任
if (request.getInstance().isEmpty() && StringUtils.isNotBlank(request.getTemplateId())) {
request.setInstance(cleaningTaskService.getInstanceByTemplateId(request.getTemplateId()));
}
return cleaningTaskService.createTask(request);
CleaningTaskDto task = cleaningTaskService.createTask(request);
taskScheduler.executeTask(task.getId(), 0);
return task;
}

@PostMapping("/{taskId}/stop")
Expand Down Expand Up @@ -74,8 +79,9 @@ public List<CleaningResultDto> cleaningTasksTaskIdGetResult(@PathVariable("taskI
return cleaningTaskService.getTaskResults(taskId);
}

@GetMapping("/{taskId}/log")
public List<CleaningTaskLog> cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId) {
return cleaningTaskService.getTaskLog(taskId);
@GetMapping("/{taskId}/log/{retryCount}")
public List<CleaningTaskLog> cleaningTasksTaskIdGetLog(@PathVariable("taskId") String taskId,
@PathVariable("retryCount") int retryCount) {
return cleaningTaskService.getTaskLog(taskId, retryCount);
}
}
10 changes: 5 additions & 5 deletions frontend/src/pages/DataCleansing/Detail/TaskDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ export default function CleansingTaskDetail() {

const [taskLog, setTaskLog] = useState();

const fetchTaskLog = async () => {
const fetchTaskLog = async (retryCount: number) => {
if (!id) return;
try {
const { data } = await queryCleaningTaskLogByIdUsingGet(id);
const { data } = await queryCleaningTaskLogByIdUsingGet(id, retryCount);
setTaskLog(data);
} catch (error) {
message.error("获取清洗日志失败");
Expand All @@ -90,7 +90,7 @@ export default function CleansingTaskDetail() {
const handleRefresh = async () => {
fetchTaskDetail();
{activeTab === "files" && await fetchTaskResult()}
{activeTab === "logs" && await fetchTaskLog()}
{activeTab === "logs" && await fetchTaskLog(task.retryCount)}
};

useEffect(() => {
Expand Down Expand Up @@ -190,7 +190,7 @@ export default function CleansingTaskDetail() {

const breadItems = [
{
title: <Link to="/data/cleansing">数据清洗</Link>,
title: <Link to="/data/cleansing">数据处理</Link>,
},
{
title: "清洗任务详情",
Expand All @@ -215,7 +215,7 @@ export default function CleansingTaskDetail() {
)}
{activeTab === "operators" && <OperatorTable task={task} />}
{activeTab === "files" && <FileTable result={result} fetchTaskResult={fetchTaskResult} />}
{activeTab === "logs" && <LogsTable taskLog={taskLog} fetchTaskLog={fetchTaskLog} />}
{activeTab === "logs" && <LogsTable taskLog={taskLog} fetchTaskLog={fetchTaskLog} retryCount={task.retryCount} />}
</div>
</div>
</>
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/DataCleansing/Detail/TemplateDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export default function CleansingTemplateDetail() {

const breadItems = [
{
title: <Link to="/data/cleansing">数据清洗</Link>,
title: <Link to="/data/cleansing">数据处理</Link>,
},
{
title: "模板详情",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ export default function BasicInfo({ task }: { task: CleansingTask }) {
),
span: 2,
},
{ key: "finishedTime", label: "结束时间", children: task?.finishedAt },
{ key: "name", label: "重试次数", children: task?.retryCount },
];

return (
Expand Down
27 changes: 23 additions & 4 deletions frontend/src/pages/DataCleansing/Detail/components/LogsTable.tsx
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
import {useEffect} from "react";
import {useEffect, useState} from "react";
import {useParams} from "react-router";
import {FileClock} from "lucide-react";

export default function LogsTable({taskLog, fetchTaskLog} : {taskLog: any[], fetchTaskLog: () => Promise<any>}) {
export default function LogsTable({taskLog, fetchTaskLog, retryCount} : {taskLog: any[], fetchTaskLog: () => Promise<any>, retryCount: number}) {
const { id = "" } = useParams();
const [selectedLog, setSelectedLog] = useState(retryCount + 1);

useEffect(() => {
fetchTaskLog();
}, [id]);
fetchTaskLog(selectedLog - 1);
}, [id, selectedLog]);

return taskLog?.length > 0 ? (
<>
{/* --- 新增区域:左上角 Select 组件 --- */}
<div className="flex items-center justify-between pb-3">
<div className="flex items-center gap-3">
<label className="text-sm font-medium text-gray-500">选择运行轮次:</label>
<select
value={selectedLog}
onChange={(e) => setSelectedLog(Number(e.target.value))}
className="bg-gray-700 border border-gray-600 !text-white text-sm rounded-md focus:ring-blue-500 focus:border-blue-500 block px-2.5 py-1.5 min-w-[120px]"
>
{Array.from({ length: retryCount + 1 }, (_, i) => retryCount + 1 - i).map((num) => (
<option key={num} value={num}>
第 {num} 次
</option>
))}
</select>
</div>
<span className="text-s text-gray-500 px-2">当前展示: 第 {selectedLog} 次</span>
</div>
<div className="text-gray-300 p-4 border border-gray-700 bg-gray-800 rounded-lg">
<div className="font-mono text-sm">
{taskLog?.map?.((log, index) => (
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/DataCleansing/Home/DataCleansing.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export default function DataProcessingPage() {
<div className="h-full flex flex-col gap-4">
{/* Header */}
<div className="flex justify-between items-center">
<h1 className="text-xl font-bold">数据清洗</h1>
<h1 className="text-xl font-bold">数据处理</h1>
<div className="flex gap-2">
<Button
icon={<PlusOutlined />}
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/pages/DataCleansing/cleansing.api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ export function queryCleaningTaskResultByIdUsingGet(taskId: string | number) {
return get(`/api/cleaning/tasks/${taskId}/result`);
}

export function queryCleaningTaskLogByIdUsingGet(taskId: string | number) {
return get(`/api/cleaning/tasks/${taskId}/log`);
export function queryCleaningTaskLogByIdUsingGet(taskId: string | number, retryCount: number) {
return get(`/api/cleaning/tasks/${taskId}/log/${retryCount}`);
}

export function updateCleaningTaskByIdUsingPut(taskId: string | number, data: any) {
Expand Down
1 change: 1 addition & 0 deletions frontend/src/pages/DataCleansing/cleansing.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface CleansingTask {
finishedAt: string;
beforeSize?: number;
afterSize?: number;
retryCount: number;
}

export interface CleansingTemplate {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/Home/Home.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export default function WelcomePage() {
数据智能编排 - 可视化流程设计
</h3>
<p className="text-orange-700">
拖拽式设计复杂数据清洗管道,让数据流转更加直观高效
拖拽式设计复杂数据处理管道,让数据流转更加直观高效
</p>
</div>

Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/Layout/Menu.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const menuItems = [
},
{
id: "cleansing",
title: "数据清洗",
title: "数据处理",
icon: GitBranch,
description: "数据清洗和预处理",
color: "bg-purple-500",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export default function OperatorPluginDetail() {
},
{
key: "requirement",
label: "环境依赖",
label: "系统规格",
},
{
key: "documentation",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ export default function Requirement({ operator }) {
<h3 className="text-lg font-semibold text-gray-900 mb-4">系统要求</h3>
<div className="space-y-3">
<div className="flex items-center justify-between py-2 border-b border-gray-100">
<span className="font-medium text-gray-700">CPU要求</span>
<span className="font-medium text-gray-700">CPU规格</span>
<span className="text-gray-900">
{requirement?.cpu || '无限制'}
</span>
</div>
<div className="flex items-center justify-between py-2 border-b border-gray-100">
<span className="font-medium text-gray-700">内存要求</span>
<span className="font-medium text-gray-700">内存规格</span>
<span className="text-gray-900">
{requirement?.memory || "无限制"}
</span>
Expand Down
11 changes: 10 additions & 1 deletion runtime/python-executor/datamate/scheduler/cmd_task_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import os
from datetime import datetime
from typing import Optional, List

Expand All @@ -13,6 +14,7 @@ class CommandTask(Task):
def __init__(self, task_id: str, command: str, log_path = None, shell: bool = True,
timeout: Optional[int] = None, *args, **kwargs):
super().__init__(task_id, *args, **kwargs)
self.max_backups = 9
self.log_path = log_path
self.command = command
self.shell = shell
Expand All @@ -34,7 +36,14 @@ async def _execute(self):
self.status = TaskStatus.RUNNING
self.started_at = datetime.now()

with open(self.log_path, 'a') as f:
current_log_path = self.log_path
if os.path.exists(current_log_path):
counter = 1
while os.path.exists(f"{self.log_path}.{counter}"):
counter += 1
current_log_path = f"{self.log_path}.{counter}"

with open(current_log_path, 'a') as f:
# 使用 asyncio.create_subprocess_shell 或 create_subprocess_exec
if self.shell:
process = await asyncio.create_subprocess_shell(
Expand Down
2 changes: 2 additions & 0 deletions scripts/db/data-cleaning-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ CREATE TABLE IF NOT EXISTS t_clean_task
before_size BIGINT,
after_size BIGINT,
file_count INTEGER,
retry_count INTEGER,
started_at TIMESTAMP,
finished_at TIMESTAMP,
created_by VARCHAR(256),
Expand All @@ -54,6 +55,7 @@ COMMENT ON COLUMN t_clean_task.dest_dataset_name IS '目标数据集名称';
COMMENT ON COLUMN t_clean_task.before_size IS '清洗前大小';
COMMENT ON COLUMN t_clean_task.after_size IS '清洗后大小';
COMMENT ON COLUMN t_clean_task.file_count IS '文件数量';
COMMENT ON COLUMN t_clean_task.retry_count IS '重试次数';
COMMENT ON COLUMN t_clean_task.started_at IS '开始时间';
COMMENT ON COLUMN t_clean_task.finished_at IS '完成时间';
COMMENT ON COLUMN t_clean_task.created_at IS '创建时间';
Expand Down
Loading