Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 49 additions & 28 deletions src/code/agent/routes/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,9 @@
from .serverless_api_routes import ServerlessApiRoutes
from .gateway_routes import GatewayRoutes
from services.serverlessapi.serverless_api_service import ServerlessApiService
from services.cleanup import OutputFileCleanupService


def _start_cleanup_thread(clean_archived: bool, timeout: int = 300):
if constants.COMFYUI_MODE != "cpu":
return None
from services.cleanup import OutputFileCleanupService
cleanup_service = OutputFileCleanupService()

cleanup_thread = threading.Thread(
target=cleanup_service.cleanup,
args=(clean_archived, timeout),
daemon=True
)

cleanup_thread.start()
return cleanup_thread


def _wait_cleanup_thread(cleanup_thread, timeout: int = 300):
if cleanup_thread:
cleanup_thread.join(timeout=timeout + 10) # 等待最多timeout+10秒


class Routes:
def __init__(self):
Expand All @@ -64,6 +45,9 @@ def setup_routes(self):

@self.app.route("/initialize", methods=["POST"])
def initialize():
import time
start_time = time.time()

# See FC docs for all the HTTP headers: https://www.alibabacloud.com/help/doc-detail/132044.htm#common-headers
request_id = request.headers.get("x-fc-request-id", "")
log("INFO", f"FC Initialize Start RequestId: {request_id}")
Expand All @@ -73,8 +57,17 @@ def initialize():
# access_key_secret = request.headers['x-fc-access-key-secret']
# access_security_token = request.headers['x-fc-security-token']

# 执行文件清理:只清理 serverless_api(在返回前等待完成,超时5分钟)
cleanup_thread = _start_cleanup_thread(clean_archived=False, timeout=300)
# 执行文件清理:只清理 serverless_api
cleanup_thread = None
cleanup_service = None
if constants.COMFYUI_MODE == "cpu":
cleanup_service = OutputFileCleanupService()
cleanup_thread = threading.Thread(
target=cleanup_service.cleanup,
args=(False,), # clean_archived=False
daemon=True
)
cleanup_thread.start()

# API模式需要自动启动comfyui进程
# TODO 防止抛出5xx导致函数计算一直重试产生大量费用
Expand All @@ -100,19 +93,39 @@ def initialize():
except Exception as e:
log("ERROR", f"prewarm models got exception:\n{e}")

# 等待清理线程完成
_wait_cleanup_thread(cleanup_thread, timeout=300)
# 检查是否已经耗时3分钟,如果超时立刻取消,否则等待剩余时间
if cleanup_thread:
elapsed_time = time.time() - start_time
if elapsed_time >= 300:
cleanup_service.cancel()
else:
remaining_time = 300 - elapsed_time
cleanup_thread.join(timeout=remaining_time)
if cleanup_thread.is_alive():
cleanup_service.cancel()

log("INFO", f"FC Initialize End RequestId: {request_id}")
return "Function is initialized, request_id: " + request_id + "\n"

@self.app.route("/pre-stop", methods=["GET"])
def pre_stop():
import time
start_time = time.time()

request_id = request.headers.get("x-fc-request-id", "")
log("INFO", f"FC PreStop Start RequestId: {request_id}")

# 执行文件清理:清理 serverless_api 和 serverless_api_archived(在返回前等待完成,超时5分钟)
# cleanup_thread = _start_cleanup_thread(clean_archived=True, timeout=300)
# 执行文件清理:清理 serverless_api 和 serverless_api_archived
cleanup_thread = None
cleanup_service = None
if constants.COMFYUI_MODE == "cpu":
cleanup_service = OutputFileCleanupService()
cleanup_thread = threading.Thread(
target=cleanup_service.cleanup,
args=(True,), # clean_archived=True
daemon=True
)
cleanup_thread.start()

service = ManagementService() # singleton

Expand Down Expand Up @@ -194,8 +207,16 @@ def do_save(result_queue, target_snapshot_name):
else:
log("INFO", "save completed successfully")

# 等待清理线程完成
# _wait_cleanup_thread(cleanup_thread, timeout=300)
# 检查是否已经耗时3分钟,如果超时立刻取消,否则等待剩余时间
if cleanup_thread:
elapsed_time = time.time() - start_time
if elapsed_time >= 300:
cleanup_service.cancel()
else:
remaining_time = 300 - elapsed_time
cleanup_thread.join(timeout=remaining_time)
if cleanup_thread.is_alive():
cleanup_service.cancel()

log("INFO", f"FC PreStop End RequestId: {request_id}")
return "OK"
Expand Down
49 changes: 22 additions & 27 deletions src/code/agent/services/cleanup/output_file_cleanup_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,53 +41,48 @@ def __init__(
self.max_files = max_files
self.ttl_seconds = ttl_seconds
self.archived_ttl_seconds = archived_ttl_seconds
self._cancel_event = threading.Event()

# 确保目录存在
self.source_dir.mkdir(parents=True, exist_ok=True)
self.archived_dir.mkdir(parents=True, exist_ok=True)

def cleanup(self, clean_archived: bool = True, timeout: int = 300):
def cleanup(self, clean_archived: bool = True):
"""
执行文件清理,如果超过超时时间则终止
执行文件清理

Args:
clean_archived: 是否清理归档目录,默认为 True
timeout: 超时时间(秒),默认300秒(5分钟),超过此时间会终止执行
"""
start_time = time.time()
timeout_event = threading.Event()
# 重置取消标志
self._cancel_event.clear()

log("INFO", f"Start to clean up output files: source={self.source_dir}, archived={self.archived_dir}, clean_archived={clean_archived}, timeout={timeout}s")

# 启动超时检查线程
def _timeout_checker():
time.sleep(timeout)
if not timeout_event.is_set():
timeout_event.set()
log("DEBUG", f"Cleanup timeout after {timeout}s, terminating cleanup")

timeout_thread = threading.Thread(target=_timeout_checker, daemon=True)
timeout_thread.start()
log("INFO", f"Start to clean up output files: source={self.source_dir}, archived={self.archived_dir}, clean_archived={clean_archived}")

try:
# 1. 移除serverless_api目录下超过时间阈值的文件
self._archive(timeout_event)
self._archive(self._cancel_event)

# 2. 清理archived目录中的过期文件(可选)
if clean_archived:
if clean_archived and not self._cancel_event.is_set():
self._cleanup_archived()

except Exception as e:
log("DEBUG", f"文件清理过程中出错: {str(e)}")
finally:
timeout_event.set() # 标记完成,停止超时检查

def _archive(self, timeout_event: threading.Event = None):
def cancel(self):
"""
取消正在执行的清理操作
"""
log("INFO", "Canceling cleanup operation")
self._cancel_event.set()

def _archive(self, cancel_event: threading.Event = None):
"""
将serverless_api目录下超过时间阈值的文件移动到归档目录

Args:
timeout_event: 超时事件,传递给 _batch_move_files 用于检查超时
cancel_event: 取消事件,用于检查是否需要中止操作
"""
# expire_time: 文件过期时间点,修改时间早于此时间的文件需要移动到归档目录
# 例如:如果 ttl_seconds=86400(1天),则 expire_time = 当前时间 - 86400秒
Expand Down Expand Up @@ -124,7 +119,7 @@ def _archive(self, timeout_event: threading.Event = None):

# 批量移动过期文件
if expired_files:
self._batch_move_files(expired_files, timeout_event)
self._batch_move_files(expired_files, cancel_event)

# 检查剩余文件数量,如果仍然超过限制,按时间排序移动
if len(remaining_files) > self.max_files:
Expand All @@ -133,24 +128,24 @@ def _archive(self, timeout_event: threading.Event = None):

# 移动超出的文件
files_to_move_by_count = [f for _, f in remaining_files[self.max_files:]]
self._batch_move_files(files_to_move_by_count, timeout_event)
self._batch_move_files(files_to_move_by_count, cancel_event)

except Exception as e:
log("DEBUG", f"Error moving old files: {type(e).__name__}: {str(e)}")

def _batch_move_files(self, files: list, timeout_event: threading.Event = None):
def _batch_move_files(self, files: list, cancel_event: threading.Event = None):
"""
批量移动文件到归档目录

Args:
files: 要移动的文件路径列表
timeout_event: 超时事件,如果设置了则终止执行
cancel_event: 取消事件,如果设置了则终止执行
"""
if not files:
return

for file_path in files:
if timeout_event and timeout_event.is_set():
if cancel_event and cancel_event.is_set():
return

dest_path = self.archived_dir / file_path.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,55 @@ def test_cleanup_full_workflow(self, cleanup_service, temp_dirs):

# 验证归档目录中的过期文件被删除
assert not archived_expired.exists()

def test_cancel_cleanup(self, cleanup_service, temp_dirs):
"""测试取消清理操作"""
import threading
serverless_api, archived_dir = temp_dirs

# 创建大量文件以确保清理需要一定时间
for i in range(100):
file_path = serverless_api / f"file_{i}.txt"
file_path.write_text(f"content {i}")
# 设置为过期文件
old_time = time.time() - (2 * 3600)
os.utime(file_path, (old_time, old_time))

# 在另一个线程中启动清理
cleanup_thread = threading.Thread(target=cleanup_service.cleanup)
cleanup_thread.start()

# 立即取消清理
time.sleep(0.01) # 短暂等待确保清理已开始
cleanup_service.cancel()

# 等待清理线程结束
cleanup_thread.join(timeout=5)

# 验证取消标志已设置
assert cleanup_service._cancel_event.is_set()

# 验证不是所有文件都被移动(因为被取消了)
remaining_files = list(serverless_api.iterdir())
# 如果取消生效,应该还有一些文件未被移动
# 注意:具体数量取决于取消时机,所以只验证不是全部移动完成
assert len(remaining_files) > 0 or len(list(archived_dir.iterdir())) < 100

def test_cleanup_resets_cancel_flag(self, cleanup_service, temp_dirs):
"""测试cleanup方法重置取消标志"""
serverless_api, archived_dir = temp_dirs

# 先设置取消标志
cleanup_service.cancel()
assert cleanup_service._cancel_event.is_set()

# 创建一个测试文件
test_file = serverless_api / "test.txt"
test_file.write_text("test")

# 再次调用cleanup应该重置标志并正常执行
cleanup_service.cleanup()

# 验证取消标志已被清除(cleanup结束后)
# 注意:cleanup执行后标志不应该被设置
assert not cleanup_service._cancel_event.is_set()