From 843ed88ee04a407c13faeaa62f88e782cb924574 Mon Sep 17 00:00:00 2001 From: "yinsu.zs" Date: Fri, 26 Dec 2025 16:20:39 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=E4=BC=98=E5=8C=96=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E6=B8=85=E7=90=86=E6=9C=8D=E5=8A=A1=E7=9A=84=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I705d7b9169971d666c4e5ce0f39b772fafbde74b --- src/code/agent/routes/routes.py | 77 ++++++++++++------- .../cleanup/output_file_cleanup_service.py | 49 ++++++------ .../output_file_cleanup_service_test.py | 52 +++++++++++++ 3 files changed, 123 insertions(+), 55 deletions(-) diff --git a/src/code/agent/routes/routes.py b/src/code/agent/routes/routes.py index 9674594a..db1c5acf 100644 --- a/src/code/agent/routes/routes.py +++ b/src/code/agent/routes/routes.py @@ -17,28 +17,9 @@ from .serverless_api_routes import ServerlessApiRoutes from .gateway_routes import GatewayRoutes from services.serverlessapi.serverless_api_service import ServerlessApiService +from services.cleanup import OutputFileCleanupService -def _start_cleanup_thread(clean_archived: bool, timeout: int = 300): - if constants.COMFYUI_MODE != "cpu": - return None - from services.cleanup import OutputFileCleanupService - cleanup_service = OutputFileCleanupService() - - cleanup_thread = threading.Thread( - target=cleanup_service.cleanup, - args=(clean_archived, timeout), - daemon=True - ) - - cleanup_thread.start() - return cleanup_thread - - -def _wait_cleanup_thread(cleanup_thread, timeout: int = 300): - if cleanup_thread: - cleanup_thread.join(timeout=timeout + 10) # 等待最多timeout+10秒 - class Routes: def __init__(self): @@ -64,6 +45,9 @@ def setup_routes(self): @self.app.route("/initialize", methods=["POST"]) def initialize(): + import time + start_time = time.time() + # See FC docs for all the HTTP headers: https://www.alibabacloud.com/help/doc-detail/132044.htm#common-headers request_id = request.headers.get("x-fc-request-id", "") log("INFO", f"FC Initialize Start RequestId: {request_id}") @@ -73,8 +57,17 @@ def initialize(): # access_key_secret = request.headers['x-fc-access-key-secret'] # access_security_token = request.headers['x-fc-security-token'] - # 执行文件清理:只清理 serverless_api(在返回前等待完成,超时5分钟) - cleanup_thread = _start_cleanup_thread(clean_archived=False, timeout=300) + # 执行文件清理:只清理 serverless_api + cleanup_thread = None + cleanup_service = None + if constants.COMFYUI_MODE == "cpu": + cleanup_service = OutputFileCleanupService() + cleanup_thread = threading.Thread( + target=cleanup_service.cleanup, + args=(False,), # clean_archived=False + daemon=True + ) + cleanup_thread.start() # API模式需要自动启动comfyui进程 # TODO 防止抛出5xx导致函数计算一直重试产生大量费用 @@ -100,19 +93,39 @@ def initialize(): except Exception as e: log("ERROR", f"prewarm models got exception:\n{e}") - # 等待清理线程完成 - _wait_cleanup_thread(cleanup_thread, timeout=300) + # 检查是否已经耗时3分钟,如果超时立刻取消,否则等待剩余时间 + if cleanup_thread: + elapsed_time = time.time() - start_time + if elapsed_time >= 300: + cleanup_service.cancel() + else: + remaining_time = 300 - elapsed_time + cleanup_thread.join(timeout=remaining_time) + if cleanup_thread.is_alive(): + cleanup_service.cancel() log("INFO", f"FC Initialize End RequestId: {request_id}") return "Function is initialized, request_id: " + request_id + "\n" @self.app.route("/pre-stop", methods=["GET"]) def pre_stop(): + import time + start_time = time.time() + request_id = request.headers.get("x-fc-request-id", "") log("INFO", f"FC PreStop Start RequestId: {request_id}") - # 执行文件清理:清理 serverless_api 和 serverless_api_archived(在返回前等待完成,超时5分钟) - # cleanup_thread = _start_cleanup_thread(clean_archived=True, timeout=300) + # 执行文件清理:清理 serverless_api 和 serverless_api_archived + cleanup_thread = None + cleanup_service = None + if constants.COMFYUI_MODE == "cpu": + cleanup_service = OutputFileCleanupService() + cleanup_thread = threading.Thread( + target=cleanup_service.cleanup, + args=(True,), # clean_archived=True + daemon=True + ) + cleanup_thread.start() service = ManagementService() # singleton @@ -194,8 +207,16 @@ def do_save(result_queue, target_snapshot_name): else: log("INFO", "save completed successfully") - # 等待清理线程完成 - # _wait_cleanup_thread(cleanup_thread, timeout=300) + # 检查是否已经耗时3分钟,如果超时立刻取消,否则等待剩余时间 + if cleanup_thread: + elapsed_time = time.time() - start_time + if elapsed_time >= 300: + cleanup_service.cancel() + else: + remaining_time = 300 - elapsed_time + cleanup_thread.join(timeout=remaining_time) + if cleanup_thread.is_alive(): + cleanup_service.cancel() log("INFO", f"FC PreStop End RequestId: {request_id}") return "OK" diff --git a/src/code/agent/services/cleanup/output_file_cleanup_service.py b/src/code/agent/services/cleanup/output_file_cleanup_service.py index ce9e094d..2779e52f 100644 --- a/src/code/agent/services/cleanup/output_file_cleanup_service.py +++ b/src/code/agent/services/cleanup/output_file_cleanup_service.py @@ -41,53 +41,48 @@ def __init__( self.max_files = max_files self.ttl_seconds = ttl_seconds self.archived_ttl_seconds = archived_ttl_seconds + self._cancel_event = threading.Event() # 确保目录存在 self.source_dir.mkdir(parents=True, exist_ok=True) self.archived_dir.mkdir(parents=True, exist_ok=True) - def cleanup(self, clean_archived: bool = True, timeout: int = 300): + def cleanup(self, clean_archived: bool = True): """ - 执行文件清理,如果超过超时时间则终止 + 执行文件清理 Args: clean_archived: 是否清理归档目录,默认为 True - timeout: 超时时间(秒),默认300秒(5分钟),超过此时间会终止执行 """ - start_time = time.time() - timeout_event = threading.Event() + # 重置取消标志 + self._cancel_event.clear() - log("INFO", f"Start to clean up output files: source={self.source_dir}, archived={self.archived_dir}, clean_archived={clean_archived}, timeout={timeout}s") - - # 启动超时检查线程 - def _timeout_checker(): - time.sleep(timeout) - if not timeout_event.is_set(): - timeout_event.set() - log("DEBUG", f"Cleanup timeout after {timeout}s, terminating cleanup") - - timeout_thread = threading.Thread(target=_timeout_checker, daemon=True) - timeout_thread.start() + log("INFO", f"Start to clean up output files: source={self.source_dir}, archived={self.archived_dir}, clean_archived={clean_archived}") try: # 1. 移除serverless_api目录下超过时间阈值的文件 - self._archive(timeout_event) + self._archive(self._cancel_event) # 2. 清理archived目录中的过期文件(可选) - if clean_archived: + if clean_archived and not self._cancel_event.is_set(): self._cleanup_archived() except Exception as e: log("DEBUG", f"文件清理过程中出错: {str(e)}") - finally: - timeout_event.set() # 标记完成,停止超时检查 - def _archive(self, timeout_event: threading.Event = None): + def cancel(self): + """ + 取消正在执行的清理操作 + """ + log("INFO", "Canceling cleanup operation") + self._cancel_event.set() + + def _archive(self, cancel_event: threading.Event = None): """ 将serverless_api目录下超过时间阈值的文件移动到归档目录 Args: - timeout_event: 超时事件,传递给 _batch_move_files 用于检查超时 + cancel_event: 取消事件,用于检查是否需要中止操作 """ # expire_time: 文件过期时间点,修改时间早于此时间的文件需要移动到归档目录 # 例如:如果 ttl_seconds=86400(1天),则 expire_time = 当前时间 - 86400秒 @@ -124,7 +119,7 @@ def _archive(self, timeout_event: threading.Event = None): # 批量移动过期文件 if expired_files: - self._batch_move_files(expired_files, timeout_event) + self._batch_move_files(expired_files, cancel_event) # 检查剩余文件数量,如果仍然超过限制,按时间排序移动 if len(remaining_files) > self.max_files: @@ -133,24 +128,24 @@ def _archive(self, timeout_event: threading.Event = None): # 移动超出的文件 files_to_move_by_count = [f for _, f in remaining_files[self.max_files:]] - self._batch_move_files(files_to_move_by_count, timeout_event) + self._batch_move_files(files_to_move_by_count, cancel_event) except Exception as e: log("DEBUG", f"Error moving old files: {type(e).__name__}: {str(e)}") - def _batch_move_files(self, files: list, timeout_event: threading.Event = None): + def _batch_move_files(self, files: list, cancel_event: threading.Event = None): """ 批量移动文件到归档目录 Args: files: 要移动的文件路径列表 - timeout_event: 超时事件,如果设置了则终止执行 + cancel_event: 取消事件,如果设置了则终止执行 """ if not files: return for file_path in files: - if timeout_event and timeout_event.is_set(): + if cancel_event and cancel_event.is_set(): return dest_path = self.archived_dir / file_path.name diff --git a/src/code/agent/test/unit/services/cleanup/output_file_cleanup_service_test.py b/src/code/agent/test/unit/services/cleanup/output_file_cleanup_service_test.py index 8f86b760..dfc8043d 100644 --- a/src/code/agent/test/unit/services/cleanup/output_file_cleanup_service_test.py +++ b/src/code/agent/test/unit/services/cleanup/output_file_cleanup_service_test.py @@ -389,3 +389,55 @@ def test_cleanup_full_workflow(self, cleanup_service, temp_dirs): # 验证归档目录中的过期文件被删除 assert not archived_expired.exists() + + def test_cancel_cleanup(self, cleanup_service, temp_dirs): + """测试取消清理操作""" + import threading + serverless_api, archived_dir = temp_dirs + + # 创建大量文件以确保清理需要一定时间 + for i in range(100): + file_path = serverless_api / f"file_{i}.txt" + file_path.write_text(f"content {i}") + # 设置为过期文件 + old_time = time.time() - (2 * 3600) + os.utime(file_path, (old_time, old_time)) + + # 在另一个线程中启动清理 + cleanup_thread = threading.Thread(target=cleanup_service.cleanup) + cleanup_thread.start() + + # 立即取消清理 + time.sleep(0.01) # 短暂等待确保清理已开始 + cleanup_service.cancel() + + # 等待清理线程结束 + cleanup_thread.join(timeout=5) + + # 验证取消标志已设置 + assert cleanup_service._cancel_event.is_set() + + # 验证不是所有文件都被移动(因为被取消了) + remaining_files = list(serverless_api.iterdir()) + # 如果取消生效,应该还有一些文件未被移动 + # 注意:具体数量取决于取消时机,所以只验证不是全部移动完成 + assert len(remaining_files) > 0 or len(list(archived_dir.iterdir())) < 100 + + def test_cleanup_resets_cancel_flag(self, cleanup_service, temp_dirs): + """测试cleanup方法重置取消标志""" + serverless_api, archived_dir = temp_dirs + + # 先设置取消标志 + cleanup_service.cancel() + assert cleanup_service._cancel_event.is_set() + + # 创建一个测试文件 + test_file = serverless_api / "test.txt" + test_file.write_text("test") + + # 再次调用cleanup应该重置标志并正常执行 + cleanup_service.cleanup() + + # 验证取消标志已被清除(cleanup结束后) + # 注意:cleanup执行后标志不应该被设置 + assert not cleanup_service._cancel_event.is_set()