Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions runtime/python-executor/datamate/core/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datamate.common.utils.registry import Registry
from datamate.common.utils import check_valid_path
from datamate.core.constant import Fields
from datamate.core.process_log_decorator import update_process_log
from datamate.sql_manager.persistence_atction import TaskInfoPersistence

OPERATORS = Registry('Operators')
Expand Down Expand Up @@ -181,6 +182,7 @@ class Mapper(BaseOp):
def __init__(self, *args, **kwargs):
super(Mapper, self).__init__(*args, **kwargs)

@update_process_log
def __call__(self, sample: Dict[str, Any], **kwargs):
# 该算子前已有算子执行该文件失败
if sample.get(Fields.result) is False:
Expand Down Expand Up @@ -309,6 +311,7 @@ class Filter(BaseOp):
def __init__(self, *args, **kwargs):
super(Filter, self).__init__(*args, **kwargs)

@update_process_log
def __call__(self, sample: Dict[str, Any], **kwargs):
# 该算子前已有算子执行该文件失败
if sample.get(Fields.result) is False:
Expand Down
213 changes: 213 additions & 0 deletions runtime/python-executor/datamate/core/process_log_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-

import uuid
import traceback
from datetime import datetime
from functools import wraps
from typing import Dict, Any, Callable

from loguru import logger
from sqlalchemy import Table, Column, String, Integer, Text, TIMESTAMP, select, update, insert, func, MetaData

from datamate.sql_manager.sql_manager import SQLManager

# 定义表结构(使用独立的 metadata,因为我们使用原始连接)
_metadata = MetaData()
_process_log_table = Table(
't_data_process_log',
_metadata,
Column('id', String(64), primary_key=True),
Column('instance_id', String(256)),
Column('operator_id', String(256)),
Column('start_time', TIMESTAMP),
Column('end_time', TIMESTAMP),
Column('succeed_file_count', Integer, default=0),
Column('failed_file_count', Integer, default=0),
Column('error_message', Text, default=''),
Column('created_at', TIMESTAMP, server_default=func.current_timestamp()),
Column('updated_at', TIMESTAMP, server_default=func.current_timestamp()),
)


def update_process_log(func: Callable) -> Callable:
"""
装饰器:在调用 Mapper 或 Filter 的 __call__ 方法前后更新 t_data_process_log 表

在调用前:
- 通过 instance_id 和 operator_id 查询记录是否存在
- 如果不存在,创建新记录(使用 UUID 作为 id),设置 start_time
- 如果存在,不更新 start_time(保持第一次创建的时间)

在调用后:
- 成功:更新 end_time,succeed_file_count++
- 失败:更新 end_time,failed_file_count++,追加 error_message
"""
@wraps(func)
def wrapper(self, sample: Dict[str, Any], **kwargs):
# 获取 instance_id 和 operator_id
instance_id = str(sample.get("instance_id", ""))
operator_id = getattr(self, 'name', 'UnknownOp')

if not instance_id:
logger.warning(f"instance_id is missing in sample, skipping process log update")
return func(self, sample, **kwargs)

start_time = datetime.now()
log_id = None

try:
# 调用前:检查记录是否存在,如果不存在则创建(只设置 start_time)
log_id = _ensure_log_record_exists(instance_id, operator_id, start_time)

# 执行原始方法
result = func(self, sample, **kwargs)

# 调用后:更新成功记录
end_time = datetime.now()
if log_id:
_update_log_record_success(log_id, end_time)

return result

except Exception as e:
# 如果执行失败,更新失败记录
end_time = datetime.now()
error_message = _get_error_message(e)

try:
# 获取 log_id(如果之前没有创建,这里需要查询或创建)
if log_id is None:
log_id = _get_or_create_log_id(instance_id, operator_id, start_time)
if log_id:
_update_log_record_failure(log_id, end_time, error_message)
except Exception as update_error:
logger.error(f"Failed to update process log after error: {update_error}")

# 重新抛出原始异常
raise e

return wrapper


def _ensure_log_record_exists(instance_id: str, operator_id: str, start_time: datetime) -> str:
"""确保日志记录存在,如果不存在则创建。使用事务和锁确保并发安全。返回记录的 id(UUID)"""
try:
with SQLManager.create_connect() as conn:
with conn.begin(): # 自动管理事务
# 使用 SELECT FOR UPDATE 锁定记录
stmt = select(_process_log_table.c.id).where(
_process_log_table.c.instance_id == instance_id,
_process_log_table.c.operator_id == operator_id
).with_for_update()

result = conn.execute(stmt)
existing = result.fetchone()

if existing is None:
# 记录不存在,创建新记录
log_id = str(uuid.uuid4())
conn.execute(
insert(_process_log_table).values(
id=log_id,
instance_id=instance_id,
operator_id=operator_id,
start_time=start_time,
succeed_file_count=0,
failed_file_count=0,
error_message=''
)
)
logger.debug(f"Created new process log record: id={log_id}, instance_id={instance_id}, operator_id={operator_id}")
return log_id
else:
log_id = existing[0]
logger.debug(f"Process log record already exists: id={log_id}, instance_id={instance_id}, operator_id={operator_id}")
return log_id
except Exception as e:
logger.error(f"Failed to ensure process log record exists: {e}")
return str(uuid.uuid4())


def _get_or_create_log_id(instance_id: str, operator_id: str, start_time: datetime) -> str:
"""获取或创建日志记录的 id,用于异常处理时确保有 log_id"""
return _ensure_log_record_exists(instance_id, operator_id, start_time)


def _update_log_record_success(log_id: str, end_time: datetime):
"""更新日志记录:成功时更新 end_time 和递增 succeed_file_count,使用事务和锁确保并发安全"""
try:
with SQLManager.create_connect() as conn:
with conn.begin(): # 自动管理事务
# 锁定记录
conn.execute(
select(_process_log_table.c.id)
.where(_process_log_table.c.id == log_id)
.with_for_update()
)

# 原子更新
conn.execute(
update(_process_log_table)
.where(_process_log_table.c.id == log_id)
.values(
end_time=end_time,
succeed_file_count=_process_log_table.c.succeed_file_count + 1,
updated_at=func.current_timestamp()
)
)
logger.debug(f"Updated process log record (success): id={log_id}")
except Exception as e:
logger.error(f"Failed to update process log record (success): {e}")


def _update_log_record_failure(log_id: str, end_time: datetime, error_message: str):
"""更新日志记录:失败时更新 end_time、递增 failed_file_count 和追加 error_message,使用事务和锁确保并发安全"""
try:
with SQLManager.create_connect() as conn:
with conn.begin(): # 自动管理事务
# 锁定记录并获取当前错误信息
result = conn.execute(
select(_process_log_table.c.error_message)
.where(_process_log_table.c.id == log_id)
.with_for_update()
)
current_error = result.scalar() or ''

# 构建新的错误信息
new_error = error_message if not current_error else f"{current_error}\n{error_message}"

# 原子更新
conn.execute(
update(_process_log_table)
.where(_process_log_table.c.id == log_id)
.values(
end_time=end_time,
failed_file_count=_process_log_table.c.failed_file_count + 1,
error_message=new_error,
updated_at=func.current_timestamp()
)
)
logger.debug(f"Updated process log record (failure): id={log_id}")
except Exception as e:
logger.error(f"Failed to update process log record (failure): {e}")


def _get_error_message(exception: Exception) -> str:
"""
获取异常的错误信息,包括异常类型、消息和堆栈跟踪
"""
try:
exc_type = type(exception).__name__
exc_msg = str(exception)
exc_traceback = traceback.format_exc()

error_message = f"[{exc_type}] {exc_msg}\n{exc_traceback}"
# 限制错误信息长度,避免数据库字段过大
max_length = 10000 # TEXT 类型通常可以存储更多,但为了安全设置限制
if len(error_message) > max_length:
error_message = error_message[:max_length] + "\n...(truncated)"

return error_message
except Exception as e:
logger.error(f"Failed to get error message: {e}")
return f"Error occurred but failed to extract message: {str(exception)}"
33 changes: 30 additions & 3 deletions scripts/db/data-cleaning-init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ CREATE TABLE IF NOT EXISTS t_operator_instance
PRIMARY KEY (instance_id, operator_id, op_index)
);

COMMENT ON TABLE t_operator_instance IS '操作员实例表';
COMMENT ON TABLE t_operator_instance IS '算子实例表';
COMMENT ON COLUMN t_operator_instance.instance_id IS '实例ID';
COMMENT ON COLUMN t_operator_instance.operator_id IS '操作员ID';
COMMENT ON COLUMN t_operator_instance.operator_id IS '算子ID';
COMMENT ON COLUMN t_operator_instance.op_index IS '操作序号';
COMMENT ON COLUMN t_operator_instance.settings_override IS '设置覆盖';

Expand Down Expand Up @@ -109,6 +109,33 @@ COMMENT ON COLUMN t_clean_result.dest_size IS '目标文件大小';
COMMENT ON COLUMN t_clean_result.status IS '状态';
COMMENT ON COLUMN t_clean_result.result IS '结果';


create table if not exists t_data_process_log
(
id VARCHAR(64) PRIMARY KEY,
instance_id VARCHAR(256),
operator_id VARCHAR(256),
start_time TIMESTAMP,
end_time TIMESTAMP,
succeed_file_count INTEGER DEFAULT 0,
failed_file_count INTEGER DEFAULT 0,
error_message TEXT DEFAULT '',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

COMMENT ON TABLE t_data_process_log IS '数据处理日志表';
COMMENT ON COLUMN t_data_process_log.id IS '主键ID';
COMMENT ON COLUMN t_data_process_log.instance_id IS '实例ID';
COMMENT ON COLUMN t_data_process_log.operator_id IS '算子ID';
COMMENT ON COLUMN t_data_process_log.start_time IS '开始时间';
COMMENT ON COLUMN t_data_process_log.end_time IS '结束时间';
COMMENT ON COLUMN t_data_process_log.succeed_file_count IS '成功文件数量';
COMMENT ON COLUMN t_data_process_log.failed_file_count IS '失败文件数量';
COMMENT ON COLUMN t_data_process_log.error_message IS '错误信息';
COMMENT ON COLUMN t_data_process_log.created_at IS '创建时间';
COMMENT ON COLUMN t_data_process_log.updated_at IS '更新时间';

-- 创建触发器用于自动更新 updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
Expand Down Expand Up @@ -180,4 +207,4 @@ VALUES
('4421504e-c6c9-4760-b55a-509d17429597', 'ImgDirectionCorrect', 11, NULL),
('4421504e-c6c9-4760-b55a-509d17429597', 'ImgResize', 12, NULL),
('4421504e-c6c9-4760-b55a-509d17429597', 'ImgTypeUnify', 13, NULL)
ON CONFLICT (instance_id, operator_id, op_index) DO NOTHING;
ON CONFLICT (instance_id, operator_id, op_index) DO NOTHING;
Loading