Skip to content

Commit 5fdbf5f

Browse files
authored
Merge pull request #183 from topcoder-platform/PM-3200_switch-away-pgboss-from-worker
PM-3200 switch away from using pgboss's worker
2 parents f3e8ce7 + 5cc8794 commit 5fdbf5f

File tree

4 files changed

+77
-49
lines changed

4 files changed

+77
-49
lines changed
583 KB
Loading
483 KB
Loading

src/shared/modules/global/queue-scheduler.service.ts

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import {
77
import * as PgBoss from 'pg-boss';
88
import { policies, Queue } from 'pg-boss';
99

10+
const PGBOSS_JOB_POLLING_INTERVAL_SEC = parseFloat(
11+
process.env.PGBOSS_JOB_POLLING_INTERVAL_SEC || '10',
12+
);
13+
1014
/**
1115
* QueueSchedulerService
1216
*/
@@ -16,11 +20,6 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
1620
private boss: PgBoss;
1721
private $start;
1822

19-
private jobsHandlersMap = new Map<
20-
string,
21-
(resolution?: 'fail' | 'complete', result?: any) => void
22-
>();
23-
2423
get isEnabled() {
2524
return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true';
2625
}
@@ -115,26 +114,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
115114
return;
116115
}
117116

118-
if (resolution === 'fail') {
119-
// IMPORTANT!
120-
// thes 4 operations will update the cache for the active singletons in the database
121-
// and will allow the jobs queue to go next or retry
122-
await this.boss.cancel(queueName, jobId);
123-
await this.boss.getQueueStats(queueName);
124-
await this.boss.supervise(queueName);
125-
await this.boss.resume(queueName, jobId);
126-
}
127-
128-
if (this.jobsHandlersMap.has(jobId)) {
129-
this.logger.log(
130-
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
131-
);
132-
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
133-
this.jobsHandlersMap.delete(jobId);
134-
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
135-
} else {
136-
await this.boss[resolution](queueName, jobId);
137-
}
117+
await this.boss[resolution](queueName, jobId);
138118

139119
this.logger.log(`Job ${jobId} ${resolution} called.`);
140120

@@ -168,16 +148,79 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
168148
await this.createQueue(queueName);
169149
}
170150

171-
return this.boss.work(queueName, handlerFn);
151+
/**
152+
* Continuously polls a job queue and processes jobs one at a time.
153+
*
154+
* @typeParam T - The type of the job payload/data.
155+
*
156+
* @remarks
157+
* - Fetches a single job from the queue (batchSize = 1) via `this.boss.fetch<T>(queueName, { batchSize: 1 })`.
158+
* - If a job is returned, logs the job and invokes `handlerFn([job])`, awaiting its completion.
159+
* - After each iteration (whether a job was found or not), schedules the next poll using
160+
* `setTimeout(..., PGBOSS_JOB_POLLING_INTERVAL)` to avoid deep recursion and to yield to the event loop.
161+
* - The scheduled, recursive invocation calls `poll().catch(() => {})` so that errors from those future
162+
* invocations are swallowed and do not produce unhandled promise rejections. Note that errors thrown
163+
* by the current invocation (for example from `handlerFn`) will propagate to the caller of this invocation
164+
* unless the caller handles them.
165+
*
166+
* @returns A Promise that resolves when this single poll iteration completes.
167+
*/
168+
const poll = async () => {
169+
try {
170+
// guard: ensure boss still exists and service still enabled
171+
if (!this.boss || !this.isEnabled) {
172+
this.logger.warn(
173+
`Polling for queue "${queueName}" stopped: boss not available or service disabled.`,
174+
);
175+
return;
176+
}
177+
178+
const [job] = await this.boss.fetch<T>(queueName, { batchSize: 1 });
179+
if (job) {
180+
// avoid throwing from here to keep the loop alive
181+
try {
182+
this.logger.log(
183+
`Starting job processing for job ${job.id} from queue "${queueName}"`,
184+
);
185+
this.logger.debug(
186+
`Job ${job.id} payload: ${JSON.stringify(job.data)}`,
187+
);
188+
} catch {
189+
// ignore stringify errors
190+
}
191+
192+
try {
193+
await handlerFn([job]);
194+
} catch (err) {
195+
this.logger.error(
196+
`Handler error while processing job ${job.id} from "${queueName}": ${
197+
(err && (err as Error).message) || err
198+
}`,
199+
err,
200+
);
201+
// don't rethrow so the scheduled next poll still runs
202+
}
203+
}
204+
} catch (err) {
205+
this.logger.error(
206+
`Error fetching job from queue "${queueName}": ${
207+
(err && (err as Error).message) || err
208+
}`,
209+
err,
210+
);
211+
// swallow to avoid unhandled promise rejection; next poll still scheduled
212+
} finally {
213+
// schedule next poll (non-blocking). Any errors from the scheduled invocation are logged.
214+
setTimeout(() => {
215+
poll().catch((err) =>
216+
this.logger.error('Unhandled poll error', err),
217+
);
218+
}, PGBOSS_JOB_POLLING_INTERVAL_SEC * 1000);
219+
}
220+
};
221+
222+
await poll();
172223
}),
173224
);
174225
}
175-
176-
registerJobHandler(
177-
jobId: string,
178-
handler: (resolution?: string, result?: any) => void,
179-
) {
180-
this.logger.log(`Registering job handler for job ${jobId}.`);
181-
this.jobsHandlersMap.set(jobId, handler);
182-
}
183226
}

src/shared/modules/global/workflow-queue.handler.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
104104
},
105105
});
106106

107-
// return not-resolved promise,
108-
// this will put a pause on the job
109-
// until it is marked as completed via webhook call
110-
await new Promise<void>((resolve, reject) => {
111-
this.scheduler.registerJobHandler(
112-
job.id,
113-
(resolution: string = 'complete', result: any) => {
114-
this.logger.log(
115-
`Job handler called with ${resolution} and ${result}`,
116-
);
117-
(resolution === 'fail' ? reject : resolve)(result);
118-
},
119-
);
120-
});
121-
122107
this.logger.log(`Job ${job.id} promise finished.`);
123108
}
124109

0 commit comments

Comments
 (0)