Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ local_settings.py

seatable-python-runner/
seatable-python-runner.zip

.python-version
15 changes: 13 additions & 2 deletions scheduler/app/faas_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class ScriptLog(Base):
return_code = Column(Integer, nullable=True)
output = Column(Text, nullable=True)
operate_from = Column(String(255))
state = Column(String(10))
created_at = Column(DateTime, index=True)

PENDING = "pending"
RUNNING = "running"
FINISHED = "finished"

def __init__(
self,
Expand All @@ -39,15 +45,17 @@ def __init__(
org_id,
script_name,
context_data,
started_at,
state,
created_at,
operate_from=None,
):
self.dtable_uuid = dtable_uuid
self.owner = owner
self.org_id = org_id
self.script_name = script_name
self.context_data = context_data
self.started_at = started_at
self.state = state
self.created_at = created_at
self.operate_from = operate_from

def to_dict(self, include_context_data=True, include_output=True):
Expand All @@ -64,6 +72,9 @@ def to_dict(self, include_context_data=True, include_output=True):
"success": self.success,
"return_code": self.return_code,
"operate_from": self.operate_from,
"state": self.state,
"created_at": self.created_at
and datetime_to_isoformat_timestr(self.created_at),
}

if include_context_data:
Expand Down
214 changes: 153 additions & 61 deletions scheduler/app/faas_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
import json
import logging
import requests
from datetime import datetime
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from uuid import UUID

from tzlocal import get_localzone
from sqlalchemy import case, desc, func, text
from sqlalchemy.orm import load_only
from faas_scheduler.models import ScriptLog
from faas_scheduler.models import (
ScriptLog,
UserRunScriptStatistics,
OrgRunScriptStatistics,
DTableRunScriptStatistics,
)

import sys

Expand Down Expand Up @@ -174,6 +179,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
},
"context_data": context_data,
"script_id": script_id,
"timeout": int(SUB_PROCESS_TIMEOUT),
}
headers = {"User-Agent": "python-scheduler/" + VERSION}
logger.debug("I call starter at url %s", RUN_FUNC_URL)
Expand All @@ -198,66 +204,140 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
return None


def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time):
if not spend_time:
return
username = owner

# dtable_run_script_statistcis
sqls = [
"""
INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES
(:dtable_uuid, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]
def update_stats_run_count(db_session, dtable_uuid, owner, org_id):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_count += 1
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_count += 1
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_count += 1
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception(
"update stats for org_id %s owner %s dtable %s run count error %s",
org_id,
owner,
dtable_uuid,
e,
)

# org_run_script_statistics
if org_id and org_id != -1:
sqls += [
"""
INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

# user_run_script_statistics
if "@seafile_group" not in username:
sqls += [
"""
INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:username, :org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
org_id=:org_id,
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
for sql in sqls:
db_session.execute(
text(sql),
{
"dtable_uuid": dtable_uuid,
"username": username,
"org_id": org_id,
"run_date": datetime.today(),
"spend_time": spend_time,
"update_at": datetime.now(),
},
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_time += spend_time
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_time += spend_time
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_time += spend_time
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception("update statistics sql error: %s", e)
logger.exception(
"update stats for org_id %s owner %s dtable %s run time error %s",
org_id,
owner,
dtable_uuid,
e,
)


# required to get "script logs" in dtable-web
Expand Down Expand Up @@ -381,20 +461,27 @@ def add_script(
org_id,
script_name,
context_data,
ScriptLog.PENDING,
datetime.now(),
operate_from,
)
db_session.add(script)
db_session.commit()

update_stats_run_count(db_session, dtable_uuid, owner, org_id)

return script


def update_script(db_session, script, success, return_code, output):
script.finished_at = datetime.now()
def update_script(
db_session, script, success, return_code, output, started_at, finished_at
):
script.started_at = started_at
script.finished_at = finished_at
script.success = success
script.return_code = return_code
script.output = output
script.state = ScriptLog.FINISHED
db_session.commit()

return script
Expand Down Expand Up @@ -422,11 +509,16 @@ def run_script(
return True


def hook_update_script(db_session, script_id, success, return_code, output, spend_time):
def hook_update_script(
db_session, script_id, success, return_code, output, started_at, spend_time
):
script = db_session.query(ScriptLog).filter_by(id=script_id).first()
if script:
update_script(db_session, script, success, return_code, output)
update_statistics(
finished_at = started_at + timedelta(seconds=spend_time)
update_script(
db_session, script, success, return_code, output, started_at, finished_at
)
update_stats_run_time(
db_session, script.dtable_uuid, script.owner, script.org_id, spend_time
)

Expand Down
Loading
Loading