Skip to content

Commit e1b55ad

Browse files
committed
Merge
2 parents 371cf2c + 5fdbf5f commit e1b55ad

File tree

7 files changed

+2599
-2738
lines changed

7 files changed

+2599
-2738
lines changed
583 KB
Loading
483 KB
Loading

pnpm-lock.yaml

Lines changed: 2514 additions & 2686 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/api/review-application/reviewApplication.service.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class ReviewApplicationService {
7171
}
7272
// make sure application role matches
7373
if (
74+
// eslint-disable-next-line @typescript-eslint/no-unsafe-enum-comparison
7475
ReviewApplicationRoleOpportunityTypeMap[dto.role] !== opportunity.type
7576
) {
7677
throw new BadRequestException(

src/api/submission/submission.service.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ export class SubmissionService {
576576
let isReviewer = false;
577577
let isCopilot = false;
578578
let isSubmitter = false;
579+
let isManager = false;
579580
if (!isOwner && submission.challengeId && uid) {
580581
try {
581582
const resources =
@@ -612,7 +613,10 @@ export class SubmissionService {
612613
if (rn.includes('submitter')) {
613614
isSubmitter = true;
614615
}
615-
if (isReviewer && isCopilot && isSubmitter) {
616+
if (rn.includes('manager')) {
617+
isManager = true;
618+
}
619+
if (isReviewer && isCopilot && isSubmitter && isManager) {
616620
break;
617621
}
618622
}
@@ -624,7 +628,7 @@ export class SubmissionService {
624628
}
625629
}
626630

627-
let canDownload = isOwner || isReviewer || isCopilot;
631+
let canDownload = isOwner || isReviewer || isCopilot || isManager;
628632

629633
if (!canDownload && isSubmitter && submission.challengeId && uid) {
630634
try {
@@ -667,7 +671,7 @@ export class SubmissionService {
667671
if (!canDownload) {
668672
throw new ForbiddenException({
669673
message:
670-
'Only the submission owner, a challenge reviewer/copilot, or an admin can download the submission',
674+
'Only the submission owner, a challenge reviewer/copilot/manager, or an admin can download the submission',
671675
code: 'FORBIDDEN_SUBMISSION_DOWNLOAD',
672676
details: {
673677
submissionId,

src/shared/modules/global/queue-scheduler.service.ts

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import {
77
import * as PgBoss from 'pg-boss';
88
import { policies, Queue } from 'pg-boss';
99

10+
const PGBOSS_JOB_POLLING_INTERVAL_SEC = parseFloat(
11+
process.env.PGBOSS_JOB_POLLING_INTERVAL_SEC || '10',
12+
);
13+
1014
/**
1115
* QueueSchedulerService
1216
*/
@@ -16,11 +20,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
1620
private boss: PgBoss;
1721
private $start;
1822

19-
private jobsHandlersMap = new Map<
20-
string,
21-
(resolution?: 'fail' | 'complete', result?: any) => void
22-
>();
23-
2423
get isEnabled() {
2524
return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true';
2625
}
@@ -115,26 +114,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
115114
return;
116115
}
117116

118-
if (resolution === 'fail') {
119-
// IMPORTANT!
120-
// thes 4 operations will update the cache for the active singletons in the database
121-
// and will allow the jobs queue to go next or retry
122-
await this.boss.cancel(queueName, jobId);
123-
await this.boss.getQueueStats(queueName);
124-
await this.boss.supervise(queueName);
125-
await this.boss.resume(queueName, jobId);
126-
}
127-
128-
if (this.jobsHandlersMap.has(jobId)) {
129-
this.logger.log(
130-
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
131-
);
132-
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
133-
this.jobsHandlersMap.delete(jobId);
134-
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
135-
} else {
136-
await this.boss[resolution](queueName, jobId);
137-
}
117+
await this.boss[resolution](queueName, jobId);
138118

139119
this.logger.log(`Job ${jobId} ${resolution} called.`);
140120

@@ -168,16 +148,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
168148
await this.createQueue(queueName);
169149
}
170150

171-
return this.boss.work(queueName, handlerFn);
151+
/**
152+
* Continuously polls a job queue and processes jobs one at a time.
153+
*
154+
* @typeParam T - The type of the job payload/data.
155+
*
156+
* @remarks
157+
* - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch<T>(queueName, { batchSize: 1 })`.
158+
* - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion.
159+
* - After each iteration (whether a job was found or not), schedules the next poll using
160+
* `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop.
161+
* - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future
162+
* invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown
163+
* by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation
164+
* unless the caller handles them.
165+
*
166+
* @returns A Promise that resolves when this single poll iteration completes.
167+
*/
168+
const poll = async () => {
169+
try {
170+
// guard: ensure boss still exists and service still enabled
171+
if (!this.boss || !this.isEnabled) {
172+
this.logger.warn(
173+
`Polling for queue "${queueName}" stopped: boss not available or service disabled.`,
174+
);
175+
return;
176+
}
177+
178+
const [job] = await this.boss.fetch<T>(queueName, { batchSize: 1 });
179+
if (job) {
180+
// avoid throwing from here to keep the loop alive
181+
try {
182+
this.logger.log(
183+
`Starting job processing for job ${job.id} from queue "${queueName}"`,
184+
);
185+
this.logger.debug(
186+
`Job ${job.id} payload: ${JSON.stringify(job.data)}`,
187+
);
188+
} catch {
189+
// ignore stringify errors
190+
}
191+
192+
try {
193+
await handlerFn([job]);
194+
} catch (err) {
195+
this.logger.error(
196+
`Handler error while processing job ${job.id} from "${queueName}": ${
197+
(err && (err as Error).message) || err
198+
}`,
199+
err,
200+
);
201+
// don't rethrow so the scheduled next poll still runs
202+
}
203+
}
204+
} catch (err) {
205+
this.logger.error(
206+
`Error fetching job from queue "${queueName}": ${
207+
(err && (err as Error).message) || err
208+
}`,
209+
err,
210+
);
211+
// swallow to avoid unhandled promise rejection; next poll still scheduled
212+
} finally {
213+
// schedule next poll (non-blocking). Any errors from the scheduled invocation are logged.
214+
setTimeout(() => {
215+
poll().catch((err) =>
216+
this.logger.error('Unhandled poll error', err),
217+
);
218+
}, PGBOSS_JOB_POLLING_INTERVAL_SEC * 1000);
219+
}
220+
};
221+
222+
await poll();
172223
}),
173224
);
174225
}
175-
176-
registerJobHandler(
177-
jobId: string,
178-
handler: (resolution?: string, result?: any) => void,
179-
) {
180-
this.logger.log(`Registering job handler for job ${jobId}.`);
181-
this.jobsHandlersMap.set(jobId, handler);
182-
}
183226
}

src/shared/modules/global/workflow-queue.handler.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
104104
},
105105
});
106106

107-
// return not-resolved promise,
108-
// this will put a pause on the job
109-
// until it is marked as completed via webhook call
110-
await new Promise<void>((resolve, reject) => {
111-
this.scheduler.registerJobHandler(
112-
job.id,
113-
(resolution: string = 'complete', result: any) => {
114-
this.logger.log(
115-
`Job handler called with ${resolution} and ${result}`,
116-
);
117-
(resolution === 'fail' ? reject : resolve)(result);
118-
},
119-
);
120-
});
121-
122107
this.logger.log(`Job ${job.id} promise finished.`);
123108
}
124109

0 commit comments

Comments
 (0)