diff --git a/runtime/python-executor/datamate/core/base_op.py b/runtime/python-executor/datamate/core/base_op.py index f87457d7..4ec80c50 100644 --- a/runtime/python-executor/datamate/core/base_op.py +++ b/runtime/python-executor/datamate/core/base_op.py @@ -18,6 +18,7 @@ from datamate.common.utils.registry import Registry from datamate.common.utils import check_valid_path from datamate.core.constant import Fields +from datamate.core.process_log_decorator import update_process_log from datamate.sql_manager.persistence_atction import TaskInfoPersistence OPERATORS = Registry('Operators') @@ -181,6 +182,7 @@ class Mapper(BaseOp): def __init__(self, *args, **kwargs): super(Mapper, self).__init__(*args, **kwargs) + @update_process_log def __call__(self, sample: Dict[str, Any], **kwargs): # 该算子前已有算子执行该文件失败 if sample.get(Fields.result) is False: @@ -309,6 +311,7 @@ class Filter(BaseOp): def __init__(self, *args, **kwargs): super(Filter, self).__init__(*args, **kwargs) + @update_process_log def __call__(self, sample: Dict[str, Any], **kwargs): # 该算子前已有算子执行该文件失败 if sample.get(Fields.result) is False: diff --git a/runtime/python-executor/datamate/core/process_log_decorator.py b/runtime/python-executor/datamate/core/process_log_decorator.py new file mode 100644 index 00000000..4c59782c --- /dev/null +++ b/runtime/python-executor/datamate/core/process_log_decorator.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- + +import uuid +import traceback +from datetime import datetime +from functools import wraps +from typing import Dict, Any, Callable + +from loguru import logger +from sqlalchemy import Table, Column, String, Integer, Text, TIMESTAMP, select, update, insert, func, MetaData + +from datamate.sql_manager.sql_manager import SQLManager + +# 定义表结构(使用独立的 metadata,因为我们使用原始连接) +_metadata = MetaData() +_process_log_table = Table( + 't_data_process_log', + _metadata, + Column('id', String(64), primary_key=True), + Column('instance_id', String(256)), + Column('operator_id', String(256)), + Column('start_time', TIMESTAMP), + Column('end_time', TIMESTAMP), + Column('succeed_file_count', Integer, default=0), + Column('failed_file_count', Integer, default=0), + Column('error_message', Text, default=''), + Column('created_at', TIMESTAMP, server_default=func.current_timestamp()), + Column('updated_at', TIMESTAMP, server_default=func.current_timestamp()), +) + + +def update_process_log(func: Callable) -> Callable: + """ + 装饰器:在调用 Mapper 或 Filter 的 __call__ 方法前后更新 t_data_process_log 表 + + 在调用前: + - 通过 instance_id 和 operator_id 查询记录是否存在 + - 如果不存在,创建新记录(使用 UUID 作为 id),设置 start_time + - 如果存在,不更新 start_time(保持第一次创建的时间) + + 在调用后: + - 成功:更新 end_time,succeed_file_count++ + - 失败:更新 end_time,failed_file_count++,追加 error_message + """ + @wraps(func) + def wrapper(self, sample: Dict[str, Any], **kwargs): + # 获取 instance_id 和 operator_id + instance_id = str(sample.get("instance_id", "")) + operator_id = getattr(self, 'name', 'UnknownOp') + + if not instance_id: + logger.warning(f"instance_id is missing in sample, skipping process log update") + return func(self, sample, **kwargs) + + start_time = datetime.now() + log_id = None + + try: + # 调用前:检查记录是否存在,如果不存在则创建(只设置 start_time) + log_id = _ensure_log_record_exists(instance_id, operator_id, start_time) + + # 执行原始方法 + result = func(self, sample, **kwargs) + + # 调用后:更新成功记录 + end_time = datetime.now() + if log_id: + _update_log_record_success(log_id, end_time) + + return result + + except Exception as e: + # 如果执行失败,更新失败记录 + end_time = datetime.now() + error_message = _get_error_message(e) + + try: + # 获取 log_id(如果之前没有创建,这里需要查询或创建) + if log_id is None: + log_id = _get_or_create_log_id(instance_id, operator_id, start_time) + if log_id: + _update_log_record_failure(log_id, end_time, error_message) + except Exception as update_error: + logger.error(f"Failed to update process log after error: {update_error}") + + # 重新抛出原始异常 + raise e + + return wrapper + + +def _ensure_log_record_exists(instance_id: str, operator_id: str, start_time: datetime) -> str: + """确保日志记录存在,如果不存在则创建。使用事务和锁确保并发安全。返回记录的 id(UUID)""" + try: + with SQLManager.create_connect() as conn: + with conn.begin(): # 自动管理事务 + # 使用 SELECT FOR UPDATE 锁定记录 + stmt = select(_process_log_table.c.id).where( + _process_log_table.c.instance_id == instance_id, + _process_log_table.c.operator_id == operator_id + ).with_for_update() + + result = conn.execute(stmt) + existing = result.fetchone() + + if existing is None: + # 记录不存在,创建新记录 + log_id = str(uuid.uuid4()) + conn.execute( + insert(_process_log_table).values( + id=log_id, + instance_id=instance_id, + operator_id=operator_id, + start_time=start_time, + succeed_file_count=0, + failed_file_count=0, + error_message='' + ) + ) + logger.debug(f"Created new process log record: id={log_id}, instance_id={instance_id}, operator_id={operator_id}") + return log_id + else: + log_id = existing[0] + logger.debug(f"Process log record already exists: id={log_id}, instance_id={instance_id}, operator_id={operator_id}") + return log_id + except Exception as e: + logger.error(f"Failed to ensure process log record exists: {e}") + return str(uuid.uuid4()) + + +def _get_or_create_log_id(instance_id: str, operator_id: str, start_time: datetime) -> str: + """获取或创建日志记录的 id,用于异常处理时确保有 log_id""" + return _ensure_log_record_exists(instance_id, operator_id, start_time) + + +def _update_log_record_success(log_id: str, end_time: datetime): + """更新日志记录:成功时更新 end_time 和递增 succeed_file_count,使用事务和锁确保并发安全""" + try: + with SQLManager.create_connect() as conn: + with conn.begin(): # 自动管理事务 + # 锁定记录 + conn.execute( + select(_process_log_table.c.id) + .where(_process_log_table.c.id == log_id) + .with_for_update() + ) + + # 原子更新 + conn.execute( + update(_process_log_table) + .where(_process_log_table.c.id == log_id) + .values( + end_time=end_time, + succeed_file_count=_process_log_table.c.succeed_file_count + 1, + updated_at=func.current_timestamp() + ) + ) + logger.debug(f"Updated process log record (success): id={log_id}") + except Exception as e: + logger.error(f"Failed to update process log record (success): {e}") + + +def _update_log_record_failure(log_id: str, end_time: datetime, error_message: str): + """更新日志记录:失败时更新 end_time、递增 failed_file_count 和追加 error_message,使用事务和锁确保并发安全""" + try: + with SQLManager.create_connect() as conn: + with conn.begin(): # 自动管理事务 + # 锁定记录并获取当前错误信息 + result = conn.execute( + select(_process_log_table.c.error_message) + .where(_process_log_table.c.id == log_id) + .with_for_update() + ) + current_error = result.scalar() or '' + + # 构建新的错误信息 + new_error = error_message if not current_error else f"{current_error}\n{error_message}" + + # 原子更新 + conn.execute( + update(_process_log_table) + .where(_process_log_table.c.id == log_id) + .values( + end_time=end_time, + failed_file_count=_process_log_table.c.failed_file_count + 1, + error_message=new_error, + updated_at=func.current_timestamp() + ) + ) + logger.debug(f"Updated process log record (failure): id={log_id}") + except Exception as e: + logger.error(f"Failed to update process log record (failure): {e}") + + +def _get_error_message(exception: Exception) -> str: + """ + 获取异常的错误信息,包括异常类型、消息和堆栈跟踪 + """ + try: + exc_type = type(exception).__name__ + exc_msg = str(exception) + exc_traceback = traceback.format_exc() + + error_message = f"[{exc_type}] {exc_msg}\n{exc_traceback}" + # 限制错误信息长度,避免数据库字段过大 + max_length = 10000 # TEXT 类型通常可以存储更多,但为了安全设置限制 + if len(error_message) > max_length: + error_message = error_message[:max_length] + "\n...(truncated)" + + return error_message + except Exception as e: + logger.error(f"Failed to get error message: {e}") + return f"Error occurred but failed to extract message: {str(exception)}" diff --git a/scripts/db/data-cleaning-init.sql b/scripts/db/data-cleaning-init.sql index 93322f44..c0bcbce6 100644 --- a/scripts/db/data-cleaning-init.sql +++ b/scripts/db/data-cleaning-init.sql @@ -73,9 +73,9 @@ CREATE TABLE IF NOT EXISTS t_operator_instance PRIMARY KEY (instance_id, operator_id, op_index) ); -COMMENT ON TABLE t_operator_instance IS '操作员实例表'; +COMMENT ON TABLE t_operator_instance IS '算子实例表'; COMMENT ON COLUMN t_operator_instance.instance_id IS '实例ID'; -COMMENT ON COLUMN t_operator_instance.operator_id IS '操作员ID'; +COMMENT ON COLUMN t_operator_instance.operator_id IS '算子ID'; COMMENT ON COLUMN t_operator_instance.op_index IS '操作序号'; COMMENT ON COLUMN t_operator_instance.settings_override IS '设置覆盖'; @@ -109,6 +109,33 @@ COMMENT ON COLUMN t_clean_result.dest_size IS '目标文件大小'; COMMENT ON COLUMN t_clean_result.status IS '状态'; COMMENT ON COLUMN t_clean_result.result IS '结果'; + +create table if not exists t_data_process_log +( + id VARCHAR(64) PRIMARY KEY, + instance_id VARCHAR(256), + operator_id VARCHAR(256), + start_time TIMESTAMP, + end_time TIMESTAMP, + succeed_file_count INTEGER DEFAULT 0, + failed_file_count INTEGER DEFAULT 0, + error_message TEXT DEFAULT '', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE t_data_process_log IS '数据处理日志表'; +COMMENT ON COLUMN t_data_process_log.id IS '主键ID'; +COMMENT ON COLUMN t_data_process_log.instance_id IS '实例ID'; +COMMENT ON COLUMN t_data_process_log.operator_id IS '算子ID'; +COMMENT ON COLUMN t_data_process_log.start_time IS '开始时间'; +COMMENT ON COLUMN t_data_process_log.end_time IS '结束时间'; +COMMENT ON COLUMN t_data_process_log.succeed_file_count IS '成功文件数量'; +COMMENT ON COLUMN t_data_process_log.failed_file_count IS '失败文件数量'; +COMMENT ON COLUMN t_data_process_log.error_message IS '错误信息'; +COMMENT ON COLUMN t_data_process_log.created_at IS '创建时间'; +COMMENT ON COLUMN t_data_process_log.updated_at IS '更新时间'; + -- 创建触发器用于自动更新 updated_at CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ @@ -180,4 +207,4 @@ VALUES ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgDirectionCorrect', 11, NULL), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgResize', 12, NULL), ('4421504e-c6c9-4760-b55a-509d17429597', 'ImgTypeUnify', 13, NULL) - ON CONFLICT (instance_id, operator_id, op_index) DO NOTHING; \ No newline at end of file + ON CONFLICT (instance_id, operator_id, op_index) DO NOTHING;