Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6a9a149
PM-3080 delete aiWorkflowRun data before deleting a submission
himaniraghav3 Dec 3, 2025
38b7ee4
deploy PM-3080
himaniraghav3 Dec 3, 2025
1f629e4
Add one more terminal status
himaniraghav3 Dec 3, 2025
5d02fb0
cascade delete
himaniraghav3 Dec 3, 2025
fb907a4
Pm-3080 - add cascade on delete for run items
vas3a Dec 3, 2025
339d003
Merge pull request #179 from topcoder-platform/PM-3080_migration
himaniraghav3 Dec 3, 2025
8958a49
clean up code
himaniraghav3 Dec 3, 2025
d5b55a4
cleanup circle ci config
himaniraghav3 Dec 3, 2025
4eadcae
Merge pull request #178 from topcoder-platform/PM-3080
himaniraghav3 Dec 3, 2025
3afd501
fix: check if the appeal phase is open while updating appeals
hentrymartin Dec 4, 2025
4d847d9
fix: check if the appeal phase is open while updating appeals
hentrymartin Dec 4, 2025
59b6584
Merge pull request #180 from topcoder-platform/pm-2660
hentrymartin Dec 8, 2025
e942253
Allow Managers to download submissions
himaniraghav3 Dec 9, 2025
b036503
Update url for tc-core-library
vas3a Dec 10, 2025
c95892d
Merge pull request #182 from topcoder-platform/tc-core-library
kkartunov Dec 10, 2025
723a847
fix build
kkartunov Dec 10, 2025
cd77c26
fix lint
kkartunov Dec 10, 2025
f3e8ce7
Merge pull request #181 from topcoder-platform/PM-2647
himaniraghav3 Dec 10, 2025
2a0c618
PM-3200 - switch away from using pgboss's workers
vas3a Dec 10, 2025
76abca2
Ai workflows diagrams
vas3a Dec 10, 2025
5cc8794
Update pgboss polling default interval to 10s
vas3a Dec 10, 2025
5fdbf5f
Merge pull request #183 from topcoder-platform/PM-3200_switch-away-pg…
vas3a Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ workflows:
branches:
only:
- develop
- feat/ai-workflows
- pm-1955_2
- re-try-failed-jobs
- pm-2539
- pm-2177_fixes
- pm-2660


- 'build-prod':
Expand Down
Binary file added docs/ai-workflow-flow.full-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/ai-workflow-flow.pgboss-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"pg-boss": "^11.0.5",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#security"
"tc-core-library-js": "topcoder-platform/tc-core-library-js#master"
},
"devDependencies": {
"@eslint/eslintrc": "^3.2.0",
Expand Down
5,208 changes: 2,518 additions & 2,690 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- DropForeignKey
ALTER TABLE "aiWorkflowRunItem" DROP CONSTRAINT "aiWorkflowRunItem_workflowRunId_fkey";

-- DropForeignKey
ALTER TABLE "aiWorkflowRunItemComment" DROP CONSTRAINT "aiWorkflowRunItemComment_workflowRunItemId_fkey";

-- AddForeignKey
ALTER TABLE "aiWorkflowRunItem" ADD CONSTRAINT "aiWorkflowRunItem_workflowRunId_fkey" FOREIGN KEY ("workflowRunId") REFERENCES "aiWorkflowRun"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "aiWorkflowRunItemComment" ADD CONSTRAINT "aiWorkflowRunItemComment_workflowRunItemId_fkey" FOREIGN KEY ("workflowRunItemId") REFERENCES "aiWorkflowRunItem"("id") ON DELETE CASCADE ON UPDATE CASCADE;
5 changes: 5 additions & 0 deletions src/api/appeal/appeal.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ export class AppealService {
);
}

// Check if the appeal phase is open while updating appeals
if (!isPrivileged && challengeId) {
await this.challengeApiService.validateAppealSubmission(challengeId);
}

if (!isPrivileged) {
await this.ensureChallengeAllowsAppealChange(challengeId, {
logContext: 'updateAppeal',
Expand Down
1 change: 1 addition & 0 deletions src/api/review-application/reviewApplication.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class ReviewApplicationService {
}
// make sure application role matches
if (
// eslint-disable-next-line @typescript-eslint/no-unsafe-enum-comparison
ReviewApplicationRoleOpportunityTypeMap[dto.role] !== opportunity.type
) {
throw new BadRequestException(
Expand Down
38 changes: 35 additions & 3 deletions src/api/submission/submission.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ export class SubmissionService {
let isReviewer = false;
let isCopilot = false;
let isSubmitter = false;
let isManager = false;
if (!isOwner && submission.challengeId && uid) {
try {
const resources =
Expand Down Expand Up @@ -612,7 +613,10 @@ export class SubmissionService {
if (rn.includes('submitter')) {
isSubmitter = true;
}
if (isReviewer && isCopilot && isSubmitter) {
if (rn.includes('manager')) {
isManager = true;
}
if (isReviewer && isCopilot && isSubmitter && isManager) {
break;
}
}
Expand All @@ -624,7 +628,7 @@ export class SubmissionService {
}
}

let canDownload = isOwner || isReviewer || isCopilot;
let canDownload = isOwner || isReviewer || isCopilot || isManager;

if (!canDownload && isSubmitter && submission.challengeId && uid) {
try {
Expand Down Expand Up @@ -667,7 +671,7 @@ export class SubmissionService {
if (!canDownload) {
throw new ForbiddenException({
message:
'Only the submission owner, a challenge reviewer/copilot, or an admin can download the submission',
'Only the submission owner, a challenge reviewer/copilot/manager, or an admin can download the submission',
code: 'FORBIDDEN_SUBMISSION_DOWNLOAD',
details: {
submissionId,
Expand Down Expand Up @@ -2287,6 +2291,34 @@ export class SubmissionService {
});
}
}
const TERMINAL_STATUSES = [
'COMPLETED',
'FAILURE',
'CANCELLED',
'SUCCESS',
];

const runs = await this.prisma.aiWorkflowRun.findMany({
where: { submissionId: id },
select: { id: true, status: true },
});

if (runs.length > 0) {
const nonTerminal = runs.filter(
(r) => !TERMINAL_STATUSES.includes(r.status),
);

if (nonTerminal.length > 0) {
throw new Error(
`Cannot delete submission: ${nonTerminal.length} workflow run(s) still active.`,
);
}

await this.prisma.aiWorkflowRun.deleteMany({
where: { submissionId: id },
});
}

await this.prisma.submission.delete({
where: { id },
});
Expand Down
111 changes: 77 additions & 34 deletions src/shared/modules/global/queue-scheduler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import {
import * as PgBoss from 'pg-boss';
import { policies, Queue } from 'pg-boss';

const PGBOSS_JOB_POLLING_INTERVAL_SEC = parseFloat(
process.env.PGBOSS_JOB_POLLING_INTERVAL_SEC || '10',
);

/**
* QueueSchedulerService
*/
Expand All @@ -16,11 +20,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
private boss: PgBoss;
private $start;

private jobsHandlersMap = new Map<
string,
(resolution?: 'fail' | 'complete', result?: any) => void
>();

get isEnabled() {
return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true';
}
Expand Down Expand Up @@ -115,26 +114,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
return;
}

if (resolution === 'fail') {
// IMPORTANT!
// thes 4 operations will update the cache for the active singletons in the database
// and will allow the jobs queue to go next or retry
await this.boss.cancel(queueName, jobId);
await this.boss.getQueueStats(queueName);
await this.boss.supervise(queueName);
await this.boss.resume(queueName, jobId);
}

if (this.jobsHandlersMap.has(jobId)) {
this.logger.log(
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
);
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
this.jobsHandlersMap.delete(jobId);
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
} else {
await this.boss[resolution](queueName, jobId);
}
await this.boss[resolution](queueName, jobId);

this.logger.log(`Job ${jobId} ${resolution} called.`);

Expand Down Expand Up @@ -168,16 +148,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
await this.createQueue(queueName);
}

return this.boss.work(queueName, handlerFn);
/**
* Continuously polls a job queue and processes jobs one at a time.
*
* @typeParam T - The type of the job payload/data.
*
* @remarks
* - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch<T>(queueName, { batchSize: 1 })`.
* - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion.
* - After each iteration (whether a job was found or not), schedules the next poll using
* `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop.
* - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future
* invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown
* by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation
* unless the caller handles them.
*
* @returns A Promise that resolves when this single poll iteration completes.
*/
const poll = async () => {
try {
// guard: ensure boss still exists and service still enabled
if (!this.boss || !this.isEnabled) {
this.logger.warn(
`Polling for queue "${queueName}" stopped: boss not available or service disabled.`,
);
return;
}

const [job] = await this.boss.fetch<T>(queueName, { batchSize: 1 });
if (job) {
// avoid throwing from here to keep the loop alive
try {
this.logger.log(
`Starting job processing for job ${job.id} from queue "${queueName}"`,
);
this.logger.debug(
`Job ${job.id} payload: ${JSON.stringify(job.data)}`,
);
} catch {
// ignore stringify errors
}

try {
await handlerFn([job]);
} catch (err) {
this.logger.error(
`Handler error while processing job ${job.id} from "${queueName}": ${
(err && (err as Error).message) || err
}`,
err,
);
// don't rethrow so the scheduled next poll still runs
}
}
} catch (err) {
this.logger.error(
`Error fetching job from queue "${queueName}": ${
(err && (err as Error).message) || err
}`,
err,
);
// swallow to avoid unhandled promise rejection; next poll still scheduled
} finally {
// schedule next poll (non-blocking). Any errors from the scheduled invocation are logged.
setTimeout(() => {
poll().catch((err) =>
this.logger.error('Unhandled poll error', err),
);
}, PGBOSS_JOB_POLLING_INTERVAL_SEC * 1000);
}
};

await poll();
}),
);
}

registerJobHandler(
jobId: string,
handler: (resolution?: string, result?: any) => void,
) {
this.logger.log(`Registering job handler for job ${jobId}.`);
this.jobsHandlersMap.set(jobId, handler);
}
}
15 changes: 0 additions & 15 deletions src/shared/modules/global/workflow-queue.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
},
});

// return not-resolved promise,
// this will put a pause on the job
// until it is marked as completed via webhook call
await new Promise<void>((resolve, reject) => {
this.scheduler.registerJobHandler(
job.id,
(resolution: string = 'complete', result: any) => {
this.logger.log(
`Job handler called with ${resolution} and ${result}`,
);
(resolution === 'fail' ? reject : resolve)(result);
},
);
});

this.logger.log(`Job ${job.id} promise finished.`);
}

Expand Down
Loading