From 1b99d3d8d4b501939945b447efa304a588c0d6a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20de=20Chevign=C3=A9?= Date: Fri, 9 Jan 2026 16:19:47 +0100 Subject: [PATCH 1/3] Add auto-disable for explorers with unreachable RPCs When an explorer's RPC becomes unreachable, the explorerSyncCheck job keeps trying to start PM2 processes that immediately fail. This creates unnecessary load on the PM2 server. This change adds automatic sync deactivation after 3 consecutive failures: - Track sync failures on Explorer model (syncFailedAttempts, syncDisabledAt, syncDisabledReason, nextRecoveryCheckAt) - Auto-disable shouldSync after 3 consecutive RPC failures or PM2 timeouts - Add syncRecoveryCheck job that periodically tests disabled explorers - Use exponential backoff for recovery checks (5m -> 15m -> 1h -> 6h) - Reset failure tracking when sync is manually re-enabled Co-Authored-By: Claude Opus 4.5 --- run/jobs/index.js | 1 + run/jobs/syncRecoveryCheck.js | 74 ++++++++++ run/jobs/updateExplorerSyncingProcess.js | 44 +++++- ...0260109161142-add-sync-failure-tracking.js | 59 ++++++++ run/models/explorer.js | 122 +++++++++++++++- run/scheduler.js | 9 ++ run/tests/jobs/syncRecoveryCheck.test.js | 135 ++++++++++++++++++ .../jobs/updateExplorerSyncingProcess.test.js | 133 ++++++++++++++++- run/tests/mocks/lib/rpc.js | 4 +- 9 files changed, 571 insertions(+), 10 deletions(-) create mode 100644 run/jobs/syncRecoveryCheck.js create mode 100644 run/migrations/20260109161142-add-sync-failure-tracking.js create mode 100644 run/tests/jobs/syncRecoveryCheck.test.js diff --git a/run/jobs/index.js b/run/jobs/index.js index 8184a313..7c6675d6 100644 --- a/run/jobs/index.js +++ b/run/jobs/index.js @@ -45,6 +45,7 @@ module.exports = { rpcHealthCheck: require('./rpcHealthCheck'), rpcHealthCheckStarter: require('./rpcHealthCheckStarter'), explorerSyncCheck: require('./explorerSyncCheck'), + syncRecoveryCheck: require('./syncRecoveryCheck'), workspaceReset: require('./workspaceReset'), batchBlockDelete: require('./batchBlockDelete'), batchContractDelete: require('./batchContractDelete'), diff --git a/run/jobs/syncRecoveryCheck.js b/run/jobs/syncRecoveryCheck.js new file mode 100644 index 00000000..c0410757 --- /dev/null +++ b/run/jobs/syncRecoveryCheck.js @@ -0,0 +1,74 @@ +/** + * @fileoverview Sync recovery check job. + * Periodically checks auto-disabled explorers to see if their RPC has recovered. + * Re-enables sync when RPC becomes reachable, using exponential backoff for retries. + * + * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) + * + * @module jobs/syncRecoveryCheck + */ + +const { Explorer, Workspace } = require('../models'); +const { ProviderConnector } = require('../lib/rpc'); +const { withTimeout } = require('../lib/utils'); +const { Op } = require('sequelize'); +const logger = require('../lib/logger'); + +module.exports = async () => { + // Find explorers that are due for a recovery check + const explorers = await Explorer.findAll({ + where: { + syncDisabledReason: { [Op.ne]: null }, + nextRecoveryCheckAt: { [Op.lte]: new Date() } + }, + include: [{ + model: Workspace, + as: 'workspace', + attributes: ['id', 'rpcServer'] + }] + }); + + if (explorers.length === 0) { + return 'No explorers due for recovery check'; + } + + let recovered = 0; + let stillUnreachable = 0; + + for (const explorer of explorers) { + try { + const provider = new ProviderConnector(explorer.workspace.rpcServer); + const block = await withTimeout(provider.fetchLatestBlock()); + + if (block) { + // RPC is reachable - re-enable sync + await explorer.enableSyncAfterRecovery(); + recovered++; + + logger.info({ + message: 'Explorer re-enabled after RPC recovery', + explorerId: explorer.id, + explorerSlug: explorer.slug, + disabledReason: explorer.syncDisabledReason + }); + } else { + // Block fetch returned null - still unreachable + await explorer.scheduleNextRecoveryCheck(); + stillUnreachable++; + } + } catch (error) { + // RPC check failed - schedule next check with backoff + await explorer.scheduleNextRecoveryCheck(); + stillUnreachable++; + + logger.debug({ + message: 'Explorer recovery check failed', + explorerId: explorer.id, + explorerSlug: explorer.slug, + error: error.message + }); + } + } + + return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable`; +}; diff --git a/run/jobs/updateExplorerSyncingProcess.js b/run/jobs/updateExplorerSyncingProcess.js index ad1d7a71..eb0a6fcb 100644 --- a/run/jobs/updateExplorerSyncingProcess.js +++ b/run/jobs/updateExplorerSyncingProcess.js @@ -1,11 +1,15 @@ /** * @fileoverview Explorer sync process update job. * Manages PM2 processes for explorer block syncing based on status. + * Auto-disables sync after repeated RPC failures with exponential backoff recovery. * @module jobs/updateExplorerSyncingProcess */ const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } = require('../models'); const PM2 = require('../lib/pm2'); +const logger = require('../lib/logger'); + +const SYNC_FAILURE_THRESHOLD = 3; module.exports = async job => { const data = job.data; @@ -57,7 +61,19 @@ module.exports = async job => { } else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) { await pm2.delete(explorer.slug); - return 'Process deleted: RPC is not reachable.'; + // Track RPC failure and potentially auto-disable + const result = await explorer.incrementSyncFailures('rpc_unreachable'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to RPC failures', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts, + reason: 'rpc_unreachable' + }); + return `Process deleted and sync auto-disabled after ${result.attempts} RPC failures.`; + } + return `Process deleted: RPC is not reachable (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD}).`; } else if (!explorer.shouldSync && existingProcess) { await pm2.delete(explorer.slug); @@ -69,17 +85,41 @@ module.exports = async job => { } else if (explorer.shouldSync && !existingProcess) { await pm2.start(explorer.slug, explorer.workspaceId); + // Reset failure counter on successful start + if (explorer.syncFailedAttempts > 0) { + await explorer.update({ syncFailedAttempts: 0 }); + } return 'Process started.'; } else if (explorer.shouldSync && existingProcess && existingProcess.pm2_env.status == 'stopped') { await pm2.resume(explorer.slug, explorer.workspaceId); + // Reset failure counter on successful resume + if (explorer.syncFailedAttempts > 0) { + await explorer.update({ syncFailedAttempts: 0 }); + } return 'Process resumed.'; } else return 'No process change.'; } catch(error) { - if (error.message.startsWith('Timed out after')) + if (error.message.startsWith('Timed out after')) { + // Track timeout as a failure if explorer exists and sync is enabled + if (explorer && explorer.shouldSync) { + const result = await explorer.incrementSyncFailures('pm2_timeout'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to PM2 timeouts', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts, + reason: 'pm2_timeout' + }); + return `Timed out and sync auto-disabled after ${result.attempts} failures.`; + } + return `Timed out (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})`; + } return 'Timed out'; + } else throw error; } diff --git a/run/migrations/20260109161142-add-sync-failure-tracking.js b/run/migrations/20260109161142-add-sync-failure-tracking.js new file mode 100644 index 00000000..62c4a8ad --- /dev/null +++ b/run/migrations/20260109161142-add-sync-failure-tracking.js @@ -0,0 +1,59 @@ +'use strict'; + +module.exports = { + async up (queryInterface, Sequelize) { + const transaction = await queryInterface.sequelize.transaction(); + try { + await queryInterface.addColumn('explorers', 'syncFailedAttempts', { + type: Sequelize.DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, { transaction }); + + await queryInterface.addColumn('explorers', 'syncDisabledAt', { + type: Sequelize.DataTypes.DATE, + allowNull: true + }, { transaction }); + + await queryInterface.addColumn('explorers', 'syncDisabledReason', { + type: Sequelize.DataTypes.STRING, + allowNull: true + }, { transaction }); + + await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', { + type: Sequelize.DataTypes.DATE, + allowNull: true + }, { transaction }); + + // Add index on nextRecoveryCheckAt for efficient recovery job queries + await queryInterface.addIndex('explorers', ['nextRecoveryCheckAt'], { + name: 'explorers_next_recovery_check_at_idx', + where: { nextRecoveryCheckAt: { [Sequelize.Op.ne]: null } }, + transaction + }); + + await transaction.commit(); + } catch(error) { + console.log(error); + await transaction.rollback(); + throw error; + } + }, + + async down(queryInterface, Sequelize) { + const transaction = await queryInterface.sequelize.transaction(); + try { + await queryInterface.removeIndex('explorers', 'explorers_next_recovery_check_at_idx', { transaction }); + await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction }); + await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction }); + await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction }); + await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction }); + + await transaction.commit(); + } catch(error) { + console.log(error); + await transaction.rollback(); + throw error; + } + } +}; diff --git a/run/models/explorer.js b/run/models/explorer.js index 908686ff..a28872b1 100644 --- a/run/models/explorer.js +++ b/run/models/explorer.js @@ -31,6 +31,15 @@ const IUniswapV2Router02 = require('../lib/abis/IUniswapV2Router02.json'); const IUniswapV2Factory = require('../lib/abis/IUniswapV2Factory.json'); const MAX_RPC_ATTEMPTS = 3; +// Sync failure auto-disable configuration +const SYNC_FAILURE_THRESHOLD = 3; +const RECOVERY_BACKOFF_SCHEDULE = [ + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000, // 15 minutes + 60 * 60 * 1000, // 1 hour + 6 * 60 * 60 * 1000 // 6 hours (max) +]; + module.exports = (sequelize, DataTypes) => { class Explorer extends Model { /** @@ -299,10 +308,18 @@ module.exports = (sequelize, DataTypes) => { /** * Starts block synchronization for the explorer. + * Resets all failure tracking when manually enabling sync. * @returns {Promise} Updated explorer */ - startSync() { - return this.update({ shouldSync: true }); + async startSync() { + await this.update({ + shouldSync: true, + syncFailedAttempts: 0, + syncDisabledAt: null, + syncDisabledReason: null, + nextRecoveryCheckAt: null + }); + return this; } /** @@ -313,6 +330,101 @@ module.exports = (sequelize, DataTypes) => { return this.update({ shouldSync: false }); } + /** + * Increments the sync failure counter and auto-disables if threshold reached. + * @param {string} [reason='rpc_unreachable'] - Reason for the failure + * @returns {Promise<{disabled: boolean, attempts: number}>} Result with disable status + */ + async incrementSyncFailures(reason = 'rpc_unreachable') { + const newCount = (this.syncFailedAttempts || 0) + 1; + await this.update({ syncFailedAttempts: newCount }); + + if (newCount >= SYNC_FAILURE_THRESHOLD) { + await this.autoDisableSync(reason); + return { disabled: true, attempts: newCount }; + } + return { disabled: false, attempts: newCount }; + } + + /** + * Auto-disables sync and schedules first recovery check. + * @param {string} reason - Reason for disabling (e.g., 'rpc_unreachable') + * @returns {Promise} Updated explorer + */ + async autoDisableSync(reason) { + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0]); + await this.update({ + shouldSync: false, + syncDisabledAt: new Date(), + syncDisabledReason: reason, + nextRecoveryCheckAt: nextCheck + }); + return this; + } + + /** + * Resets all sync failure tracking state. + * @returns {Promise} Updated explorer + */ + async resetSyncState() { + await this.update({ + syncFailedAttempts: 0, + syncDisabledAt: null, + syncDisabledReason: null, + nextRecoveryCheckAt: null + }); + return this; + } + + /** + * Schedules the next recovery check using exponential backoff. + * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) + * @returns {Promise} Updated explorer + */ + async scheduleNextRecoveryCheck() { + if (!this.syncDisabledAt) { + return this; + } + + const timeSinceDisabled = Date.now() - new Date(this.syncDisabledAt).getTime(); + let cumulativeTime = 0; + let backoffIndex = 0; + + // Find which backoff interval we should use based on time since disabled + for (let i = 0; i < RECOVERY_BACKOFF_SCHEDULE.length; i++) { + cumulativeTime += RECOVERY_BACKOFF_SCHEDULE[i]; + if (timeSinceDisabled < cumulativeTime) { + backoffIndex = i; + break; + } + backoffIndex = i; + } + + // Cap at max backoff (last element) + if (backoffIndex >= RECOVERY_BACKOFF_SCHEDULE.length) { + backoffIndex = RECOVERY_BACKOFF_SCHEDULE.length - 1; + } + + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex]); + await this.update({ nextRecoveryCheckAt: nextCheck }); + return this; + } + + /** + * Re-enables sync after successful recovery check. + * @returns {Promise} Updated explorer + */ + async enableSyncAfterRecovery() { + await this.update({ + shouldSync: true, + syncFailedAttempts: 0, + syncDisabledAt: null, + syncDisabledReason: null, + nextRecoveryCheckAt: null + }); + return this; + } + /** * Creates a Uniswap V2 compatible DEX for the explorer. * @param {string} routerAddress - DEX router contract address @@ -690,7 +802,11 @@ module.exports = (sequelize, DataTypes) => { shouldEnforceQuota: DataTypes.BOOLEAN, isDemo: DataTypes.BOOLEAN, gasAnalyticsEnabled: DataTypes.BOOLEAN, - displayTopAccounts: DataTypes.BOOLEAN + displayTopAccounts: DataTypes.BOOLEAN, + syncFailedAttempts: DataTypes.INTEGER, + syncDisabledAt: DataTypes.DATE, + syncDisabledReason: DataTypes.STRING, + nextRecoveryCheckAt: DataTypes.DATE }, { hooks: { afterCreate(explorer, options) { diff --git a/run/scheduler.js b/run/scheduler.js index ce1271f1..677b39ff 100644 --- a/run/scheduler.js +++ b/run/scheduler.js @@ -3,6 +3,7 @@ const { enqueue } = require('./lib/queue'); const INTEGRITY_CHECK_INTERVAL = 5 * 60 * 1000; const RPC_HEALTH_CHECK_INTERVAL = 5 * 60 * 1000; const SUBSCRIPTION_CHECK_INTERVAL = 5 * 60 * 1000; +const SYNC_RECOVERY_CHECK_INTERVAL = 5 * 60 * 1000; const QUEUE_MONITORING_INTERVAL = 60 * 1000; const CANCEL_DEMO_INTERVAL = 60 * 60 * 1000; const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000; @@ -48,6 +49,14 @@ const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000; { every: SUBSCRIPTION_CHECK_INTERVAL } ); + await enqueue( + 'syncRecoveryCheck', + 'syncRecoveryCheck', + {}, + 10, + { every: SYNC_RECOVERY_CHECK_INTERVAL } + ); + await enqueue( 'queueMonitoring', 'queueMonitoring', diff --git a/run/tests/jobs/syncRecoveryCheck.test.js b/run/tests/jobs/syncRecoveryCheck.test.js new file mode 100644 index 00000000..53cc6ae7 --- /dev/null +++ b/run/tests/jobs/syncRecoveryCheck.test.js @@ -0,0 +1,135 @@ +const { Explorer, Workspace } = require('../mocks/models'); +require('../mocks/lib/rpc'); +require('../mocks/lib/utils'); +require('../mocks/lib/logger'); + +const { ProviderConnector } = require('../../lib/rpc'); +const syncRecoveryCheck = require('../../jobs/syncRecoveryCheck'); + +beforeEach(() => jest.clearAllMocks()); + +describe('syncRecoveryCheck', () => { + it('Should return early if no explorers need recovery check', async () => { + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([]); + + const result = await syncRecoveryCheck(); + + expect(result).toEqual('No explorers due for recovery check'); + }); + + it('Should re-enable sync when RPC is reachable', async () => { + const mockEnableSyncAfterRecovery = jest.fn().mockResolvedValue({}); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: mockEnableSyncAfterRecovery, + scheduleNextRecoveryCheck: jest.fn(), + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue({ number: 100 }) + })); + + const result = await syncRecoveryCheck(); + + expect(mockEnableSyncAfterRecovery).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 1 recovered, 0 still unreachable'); + }); + + it('Should schedule next recovery check when RPC is still unreachable', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({}); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue(null) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable'); + }); + + it('Should schedule next recovery check when RPC check throws error', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({}); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockRejectedValue(new Error('Connection refused')) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable'); + }); + + it('Should handle multiple explorers with mixed results', async () => { + const mockEnableSyncAfterRecovery1 = jest.fn().mockResolvedValue({}); + const mockScheduleNextRecoveryCheck2 = jest.fn().mockResolvedValue({}); + + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([ + { + id: 1, + slug: 'explorer-1', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: mockEnableSyncAfterRecovery1, + scheduleNextRecoveryCheck: jest.fn(), + workspace: { + id: 1, + rpcServer: 'http://working-rpc:8545' + } + }, + { + id: 2, + slug: 'explorer-2', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck2, + workspace: { + id: 2, + rpcServer: 'http://broken-rpc:8545' + } + } + ]); + + let callCount = 0; + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockImplementation(() => { + callCount++; + if (callCount === 1) { + return Promise.resolve({ number: 100 }); + } + return Promise.resolve(null); + }) + })); + + const result = await syncRecoveryCheck(); + + expect(mockEnableSyncAfterRecovery1).toHaveBeenCalled(); + expect(mockScheduleNextRecoveryCheck2).toHaveBeenCalled(); + expect(result).toEqual('Checked 2 explorers: 1 recovered, 1 still unreachable'); + }); +}); diff --git a/run/tests/jobs/updateExplorerSyncingProcess.test.js b/run/tests/jobs/updateExplorerSyncingProcess.test.js index 4b321fe1..4fe5eca6 100644 --- a/run/tests/jobs/updateExplorerSyncingProcess.test.js +++ b/run/tests/jobs/updateExplorerSyncingProcess.test.js @@ -1,5 +1,6 @@ require('../mocks/lib/queue'); require('../mocks/lib/pm2'); +require('../mocks/lib/logger'); const { Explorer } = require('../mocks/models'); const PM2 = require('../../lib/pm2'); @@ -8,15 +9,17 @@ const updateExplorerSyncingProcess = require('../../jobs/updateExplorerSyncingPr beforeEach(() => jest.clearAllMocks()); const hasReachedTransactionQuota = jest.fn().mockResolvedValue(false); +const incrementSyncFailures = jest.fn(); +const update = jest.fn(); describe('updateExplorerSyncingProcess', () => { - it('Should exit gracefully if timeout', (done) => { + it('Should exit gracefully if timeout when sync is disabled', (done) => { const reset = jest.fn(); PM2.mockImplementationOnce(() => ({ reset, find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) })); - jest.spyOn(Explorer, 'findOne').mockResolvedValue({ slug: 'slug', workspaceId: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ slug: 'slug', workspaceId: 1, shouldSync: false }); updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { @@ -25,6 +28,48 @@ describe('updateExplorerSyncingProcess', () => { }); }); + it('Should track timeout as failure when sync is enabled', (done) => { + PM2.mockImplementationOnce(() => ({ + find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ + id: 1, + slug: 'slug', + workspaceId: 1, + shouldSync: true, + incrementSyncFailures: mockIncrementSyncFailures + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('pm2_timeout'); + expect(res).toEqual('Timed out (attempt 1/3)'); + done(); + }); + }); + + it('Should auto-disable sync after 3 timeouts', (done) => { + PM2.mockImplementationOnce(() => ({ + find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ + id: 1, + slug: 'slug', + workspaceId: 1, + shouldSync: true, + incrementSyncFailures: mockIncrementSyncFailures + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('pm2_timeout'); + expect(res).toEqual('Timed out and sync auto-disabled after 3 failures.'); + done(); + }); + }); + it('Should reset if flag is passed', (done) => { const reset = jest.fn(); PM2.mockImplementationOnce(() => ({ @@ -91,14 +136,18 @@ describe('updateExplorerSyncingProcess', () => { }); }); - it('Should delete if rpc is not reachable', (done) => { + it('Should delete if rpc is not reachable and increment failures', (done) => { PM2.mockImplementationOnce(() => ({ delete: jest.fn(), find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', stripeSubscription: {}, hasReachedTransactionQuota, + incrementSyncFailures: mockIncrementSyncFailures, workspace: { rpcHealthCheck: { isReachable: false @@ -108,7 +157,35 @@ describe('updateExplorerSyncingProcess', () => { updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { - expect(res).toEqual('Process deleted: RPC is not reachable.'); + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(res).toEqual('Process deleted: RPC is not reachable (attempt 1/3).'); + done(); + }); + }); + + it('Should auto-disable sync after 3 RPC failures', (done) => { + PM2.mockImplementationOnce(() => ({ + delete: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + stripeSubscription: {}, + hasReachedTransactionQuota, + incrementSyncFailures: mockIncrementSyncFailures, + workspace: { + rpcHealthCheck: { + isReachable: false + } + } + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(res).toEqual('Process deleted and sync auto-disabled after 3 RPC failures.'); done(); }); }); @@ -180,11 +257,35 @@ describe('updateExplorerSyncingProcess', () => { hasReachedTransactionQuota, stripeSubscription: {}, shouldSync: true, + syncFailedAttempts: 0, + workspace: {} + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(res).toEqual('Process started.'); + done(); + }); + }); + + it('Should reset failure counter on successful start', (done) => { + PM2.mockImplementationOnce(() => ({ + start: jest.fn(), + find: jest.fn().mockResolvedValue({ data: null }) + })); + const mockUpdate = jest.fn(); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + hasReachedTransactionQuota, + stripeSubscription: {}, + shouldSync: true, + syncFailedAttempts: 2, + update: mockUpdate, workspace: {} }); updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { + expect(mockUpdate).toHaveBeenCalledWith({ syncFailedAttempts: 0 }); expect(res).toEqual('Process started.'); done(); }); @@ -199,11 +300,35 @@ describe('updateExplorerSyncingProcess', () => { hasReachedTransactionQuota, stripeSubscription: {}, shouldSync: true, + syncFailedAttempts: 0, + workspace: {} + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(res).toEqual('Process resumed.'); + done(); + }); + }); + + it('Should reset failure counter on successful resume', (done) => { + PM2.mockImplementationOnce(() => ({ + resume: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'stopped' }}}) + })); + const mockUpdate = jest.fn(); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + hasReachedTransactionQuota, + stripeSubscription: {}, + shouldSync: true, + syncFailedAttempts: 1, + update: mockUpdate, workspace: {} }); updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { + expect(mockUpdate).toHaveBeenCalledWith({ syncFailedAttempts: 0 }); expect(res).toEqual('Process resumed.'); done(); }); diff --git a/run/tests/mocks/lib/rpc.js b/run/tests/mocks/lib/rpc.js index fad2ce0b..684eba8d 100644 --- a/run/tests/mocks/lib/rpc.js +++ b/run/tests/mocks/lib/rpc.js @@ -49,7 +49,9 @@ jest.mock('../../../lib/rpc', () => { fetchTransactionReceipt: jest.fn() .mockResolvedValue({ status: 1 - }) + }), + fetchLatestBlock: jest.fn() + .mockResolvedValue({ number: 100 }) })), ERC721Connector: jest.fn().mockImplementation(() => ({ fetchAndStoreAllTokens: jest.fn().mockResolvedValue(true), From a9dae10141c86d6b0bdbc4c7b9179cc153776cdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20de=20Chevign=C3=A9?= Date: Fri, 9 Jan 2026 16:38:42 +0100 Subject: [PATCH 2/3] Address code review feedback for sync auto-disable - Fix race condition: use atomic increment for syncFailedAttempts - Add null check for explorer.workspace in updateExplorerSyncingProcess - Simplify backoff calculation using recoveryAttempts counter - Add max recovery attempts (10) after which manual intervention required - Add batching (50 explorers) to syncRecoveryCheck job - Add random jitter (up to 2 min) to stagger recovery checks - Remove dead code (resetSyncState method) - Add recoveryAttempts column to migration Co-Authored-By: Claude Opus 4.5 --- run/jobs/syncRecoveryCheck.js | 55 +++++++++++-- run/jobs/updateExplorerSyncingProcess.js | 15 +++- ...0260109161142-add-sync-failure-tracking.js | 7 ++ run/models/explorer.js | 80 +++++++++---------- run/tests/jobs/syncRecoveryCheck.test.js | 54 +++++++++++-- .../jobs/updateExplorerSyncingProcess.test.js | 18 +++++ 6 files changed, 171 insertions(+), 58 deletions(-) diff --git a/run/jobs/syncRecoveryCheck.js b/run/jobs/syncRecoveryCheck.js index c0410757..27084dbf 100644 --- a/run/jobs/syncRecoveryCheck.js +++ b/run/jobs/syncRecoveryCheck.js @@ -4,6 +4,7 @@ * Re-enables sync when RPC becomes reachable, using exponential backoff for retries. * * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) + * Max recovery attempts: 10 (after which manual intervention is required) * * @module jobs/syncRecoveryCheck */ @@ -14,8 +15,12 @@ const { withTimeout } = require('../lib/utils'); const { Op } = require('sequelize'); const logger = require('../lib/logger'); +// Process explorers in batches to avoid long-running jobs +const BATCH_SIZE = 50; + module.exports = async () => { - // Find explorers that are due for a recovery check + // Find explorers that are due for a recovery check (with batch limit) + // Excludes explorers that have reached max recovery attempts (nextRecoveryCheckAt is null) const explorers = await Explorer.findAll({ where: { syncDisabledReason: { [Op.ne]: null }, @@ -25,7 +30,9 @@ module.exports = async () => { model: Workspace, as: 'workspace', attributes: ['id', 'rpcServer'] - }] + }], + limit: BATCH_SIZE, + order: [['nextRecoveryCheckAt', 'ASC']] // Process oldest first }); if (explorers.length === 0) { @@ -34,9 +41,20 @@ module.exports = async () => { let recovered = 0; let stillUnreachable = 0; + let maxAttemptsReached = 0; for (const explorer of explorers) { try { + // Skip if workspace is missing + if (!explorer.workspace || !explorer.workspace.rpcServer) { + logger.warn({ + message: 'Explorer recovery check skipped: no workspace or RPC server', + explorerId: explorer.id, + explorerSlug: explorer.slug + }); + continue; + } + const provider = new ProviderConnector(explorer.workspace.rpcServer); const block = await withTimeout(provider.fetchLatestBlock()); @@ -49,17 +67,38 @@ module.exports = async () => { message: 'Explorer re-enabled after RPC recovery', explorerId: explorer.id, explorerSlug: explorer.slug, - disabledReason: explorer.syncDisabledReason + disabledReason: explorer.syncDisabledReason, + recoveryAttempts: explorer.recoveryAttempts }); } else { // Block fetch returned null - still unreachable - await explorer.scheduleNextRecoveryCheck(); - stillUnreachable++; + const result = await explorer.scheduleNextRecoveryCheck(); + if (result.maxReached) { + maxAttemptsReached++; + logger.warn({ + message: 'Explorer reached max recovery attempts - manual intervention required', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts + }); + } else { + stillUnreachable++; + } } } catch (error) { // RPC check failed - schedule next check with backoff - await explorer.scheduleNextRecoveryCheck(); - stillUnreachable++; + const result = await explorer.scheduleNextRecoveryCheck(); + if (result.maxReached) { + maxAttemptsReached++; + logger.warn({ + message: 'Explorer reached max recovery attempts - manual intervention required', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts + }); + } else { + stillUnreachable++; + } logger.debug({ message: 'Explorer recovery check failed', @@ -70,5 +109,5 @@ module.exports = async () => { } } - return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable`; + return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable, ${maxAttemptsReached} max attempts reached`; }; diff --git a/run/jobs/updateExplorerSyncingProcess.js b/run/jobs/updateExplorerSyncingProcess.js index eb0a6fcb..d2c36627 100644 --- a/run/jobs/updateExplorerSyncingProcess.js +++ b/run/jobs/updateExplorerSyncingProcess.js @@ -9,6 +9,7 @@ const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } = const PM2 = require('../lib/pm2'); const logger = require('../lib/logger'); +// Must match SYNC_FAILURE_THRESHOLD in explorer.js const SYNC_FAILURE_THRESHOLD = 3; module.exports = async job => { @@ -23,17 +24,21 @@ module.exports = async job => { { model: Workspace, as: 'workspace', + required: false, include: { model: RpcHealthCheck, - as: 'rpcHealthCheck' + as: 'rpcHealthCheck', + required: false } }, { model: StripeSubscription, as: 'stripeSubscription', + required: false, include: { model: StripePlan, - as: 'stripePlan' + as: 'stripePlan', + required: false } } ] @@ -59,7 +64,11 @@ module.exports = async job => { await pm2.delete(explorer.slug); return 'Process deleted: no subscription.'; } - else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) { + else if (explorer && !explorer.workspace) { + await pm2.delete(explorer.slug); + return 'Process deleted: no workspace.'; + } + else if (explorer.workspace && explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) { await pm2.delete(explorer.slug); // Track RPC failure and potentially auto-disable const result = await explorer.incrementSyncFailures('rpc_unreachable'); diff --git a/run/migrations/20260109161142-add-sync-failure-tracking.js b/run/migrations/20260109161142-add-sync-failure-tracking.js index 62c4a8ad..3205c852 100644 --- a/run/migrations/20260109161142-add-sync-failure-tracking.js +++ b/run/migrations/20260109161142-add-sync-failure-tracking.js @@ -20,6 +20,12 @@ module.exports = { allowNull: true }, { transaction }); + await queryInterface.addColumn('explorers', 'recoveryAttempts', { + type: Sequelize.DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, { transaction }); + await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', { type: Sequelize.DataTypes.DATE, allowNull: true @@ -47,6 +53,7 @@ module.exports = { await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction }); await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction }); await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction }); + await queryInterface.removeColumn('explorers', 'recoveryAttempts', { transaction }); await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction }); await transaction.commit(); diff --git a/run/models/explorer.js b/run/models/explorer.js index a28872b1..86fe666d 100644 --- a/run/models/explorer.js +++ b/run/models/explorer.js @@ -33,12 +33,15 @@ const MAX_RPC_ATTEMPTS = 3; // Sync failure auto-disable configuration const SYNC_FAILURE_THRESHOLD = 3; +const MAX_RECOVERY_ATTEMPTS = 10; const RECOVERY_BACKOFF_SCHEDULE = [ 5 * 60 * 1000, // 5 minutes 15 * 60 * 1000, // 15 minutes 60 * 60 * 1000, // 1 hour 6 * 60 * 60 * 1000 // 6 hours (max) ]; +// Stagger recovery checks to avoid thundering herd (random jitter up to 2 minutes) +const RECOVERY_JITTER_MAX = 2 * 60 * 1000; module.exports = (sequelize, DataTypes) => { class Explorer extends Model { @@ -317,6 +320,7 @@ module.exports = (sequelize, DataTypes) => { syncFailedAttempts: 0, syncDisabledAt: null, syncDisabledReason: null, + recoveryAttempts: 0, nextRecoveryCheckAt: null }); return this; @@ -332,82 +336,76 @@ module.exports = (sequelize, DataTypes) => { /** * Increments the sync failure counter and auto-disables if threshold reached. + * Uses atomic increment to avoid race conditions. * @param {string} [reason='rpc_unreachable'] - Reason for the failure * @returns {Promise<{disabled: boolean, attempts: number}>} Result with disable status */ async incrementSyncFailures(reason = 'rpc_unreachable') { - const newCount = (this.syncFailedAttempts || 0) + 1; - await this.update({ syncFailedAttempts: newCount }); + // Use atomic increment to avoid race conditions + await this.increment('syncFailedAttempts'); + await this.reload(); - if (newCount >= SYNC_FAILURE_THRESHOLD) { + if (this.syncFailedAttempts >= SYNC_FAILURE_THRESHOLD) { await this.autoDisableSync(reason); - return { disabled: true, attempts: newCount }; + return { disabled: true, attempts: this.syncFailedAttempts }; } - return { disabled: false, attempts: newCount }; + return { disabled: false, attempts: this.syncFailedAttempts }; } /** * Auto-disables sync and schedules first recovery check. + * Adds random jitter to avoid thundering herd when many explorers are disabled at once. * @param {string} reason - Reason for disabling (e.g., 'rpc_unreachable') * @returns {Promise} Updated explorer */ async autoDisableSync(reason) { - const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0]); + // Add random jitter to stagger recovery checks + const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX); + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0] + jitter); await this.update({ shouldSync: false, syncDisabledAt: new Date(), syncDisabledReason: reason, + recoveryAttempts: 0, nextRecoveryCheckAt: nextCheck }); return this; } - /** - * Resets all sync failure tracking state. - * @returns {Promise} Updated explorer - */ - async resetSyncState() { - await this.update({ - syncFailedAttempts: 0, - syncDisabledAt: null, - syncDisabledReason: null, - nextRecoveryCheckAt: null - }); - return this; - } - /** * Schedules the next recovery check using exponential backoff. + * Increments recovery attempts and returns null if max attempts reached. * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) - * @returns {Promise} Updated explorer + * @returns {Promise<{scheduled: boolean, attempts: number, maxReached: boolean}>} Result */ async scheduleNextRecoveryCheck() { if (!this.syncDisabledAt) { - return this; + return { scheduled: false, attempts: 0, maxReached: false }; } - const timeSinceDisabled = Date.now() - new Date(this.syncDisabledAt).getTime(); - let cumulativeTime = 0; - let backoffIndex = 0; + const newAttempts = (this.recoveryAttempts || 0) + 1; - // Find which backoff interval we should use based on time since disabled - for (let i = 0; i < RECOVERY_BACKOFF_SCHEDULE.length; i++) { - cumulativeTime += RECOVERY_BACKOFF_SCHEDULE[i]; - if (timeSinceDisabled < cumulativeTime) { - backoffIndex = i; - break; - } - backoffIndex = i; + // Check if max recovery attempts reached + if (newAttempts >= MAX_RECOVERY_ATTEMPTS) { + await this.update({ + recoveryAttempts: newAttempts, + nextRecoveryCheckAt: null, + syncDisabledReason: 'max_recovery_attempts_reached' + }); + return { scheduled: false, attempts: newAttempts, maxReached: true }; } - // Cap at max backoff (last element) - if (backoffIndex >= RECOVERY_BACKOFF_SCHEDULE.length) { - backoffIndex = RECOVERY_BACKOFF_SCHEDULE.length - 1; - } + // Use recovery attempts as index, capped at max backoff + const backoffIndex = Math.min(newAttempts - 1, RECOVERY_BACKOFF_SCHEDULE.length - 1); + // Add random jitter to stagger recovery checks + const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX); + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex] + jitter); - const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex]); - await this.update({ nextRecoveryCheckAt: nextCheck }); - return this; + await this.update({ + recoveryAttempts: newAttempts, + nextRecoveryCheckAt: nextCheck + }); + return { scheduled: true, attempts: newAttempts, maxReached: false }; } /** @@ -420,6 +418,7 @@ module.exports = (sequelize, DataTypes) => { syncFailedAttempts: 0, syncDisabledAt: null, syncDisabledReason: null, + recoveryAttempts: 0, nextRecoveryCheckAt: null }); return this; @@ -806,6 +805,7 @@ module.exports = (sequelize, DataTypes) => { syncFailedAttempts: DataTypes.INTEGER, syncDisabledAt: DataTypes.DATE, syncDisabledReason: DataTypes.STRING, + recoveryAttempts: DataTypes.INTEGER, nextRecoveryCheckAt: DataTypes.DATE }, { hooks: { diff --git a/run/tests/jobs/syncRecoveryCheck.test.js b/run/tests/jobs/syncRecoveryCheck.test.js index 53cc6ae7..da8104db 100644 --- a/run/tests/jobs/syncRecoveryCheck.test.js +++ b/run/tests/jobs/syncRecoveryCheck.test.js @@ -23,6 +23,7 @@ describe('syncRecoveryCheck', () => { id: 1, slug: 'test-explorer', syncDisabledReason: 'rpc_unreachable', + recoveryAttempts: 2, enableSyncAfterRecovery: mockEnableSyncAfterRecovery, scheduleNextRecoveryCheck: jest.fn(), workspace: { @@ -37,11 +38,11 @@ describe('syncRecoveryCheck', () => { const result = await syncRecoveryCheck(); expect(mockEnableSyncAfterRecovery).toHaveBeenCalled(); - expect(result).toEqual('Checked 1 explorers: 1 recovered, 0 still unreachable'); + expect(result).toEqual('Checked 1 explorers: 1 recovered, 0 still unreachable, 0 max attempts reached'); }); it('Should schedule next recovery check when RPC is still unreachable', async () => { - const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({}); + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: true, attempts: 3, maxReached: false }); jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ id: 1, slug: 'test-explorer', @@ -60,11 +61,11 @@ describe('syncRecoveryCheck', () => { const result = await syncRecoveryCheck(); expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); - expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable'); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable, 0 max attempts reached'); }); it('Should schedule next recovery check when RPC check throws error', async () => { - const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({}); + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: true, attempts: 2, maxReached: false }); jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ id: 1, slug: 'test-explorer', @@ -83,18 +84,57 @@ describe('syncRecoveryCheck', () => { const result = await syncRecoveryCheck(); expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); - expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable'); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable, 0 max attempts reached'); + }); + + it('Should handle max recovery attempts reached', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: false, attempts: 10, maxReached: true }); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue(null) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 0 still unreachable, 1 max attempts reached'); + }); + + it('Should skip explorers without workspace', async () => { + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: jest.fn(), + workspace: null + }]); + + const result = await syncRecoveryCheck(); + + expect(result).toEqual('Checked 1 explorers: 0 recovered, 0 still unreachable, 0 max attempts reached'); }); it('Should handle multiple explorers with mixed results', async () => { const mockEnableSyncAfterRecovery1 = jest.fn().mockResolvedValue({}); - const mockScheduleNextRecoveryCheck2 = jest.fn().mockResolvedValue({}); + const mockScheduleNextRecoveryCheck2 = jest.fn().mockResolvedValue({ scheduled: true, attempts: 5, maxReached: false }); jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([ { id: 1, slug: 'explorer-1', syncDisabledReason: 'rpc_unreachable', + recoveryAttempts: 3, enableSyncAfterRecovery: mockEnableSyncAfterRecovery1, scheduleNextRecoveryCheck: jest.fn(), workspace: { @@ -130,6 +170,6 @@ describe('syncRecoveryCheck', () => { expect(mockEnableSyncAfterRecovery1).toHaveBeenCalled(); expect(mockScheduleNextRecoveryCheck2).toHaveBeenCalled(); - expect(result).toEqual('Checked 2 explorers: 1 recovered, 1 still unreachable'); + expect(result).toEqual('Checked 2 explorers: 1 recovered, 1 still unreachable, 0 max attempts reached'); }); }); diff --git a/run/tests/jobs/updateExplorerSyncingProcess.test.js b/run/tests/jobs/updateExplorerSyncingProcess.test.js index 4fe5eca6..9b73521e 100644 --- a/run/tests/jobs/updateExplorerSyncingProcess.test.js +++ b/run/tests/jobs/updateExplorerSyncingProcess.test.js @@ -210,6 +210,24 @@ describe('updateExplorerSyncingProcess', () => { }); }); + it('Should delete if no workspace', (done) => { + PM2.mockImplementationOnce(() => ({ + delete: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) + })); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + slug: 'test-explorer', + stripeSubscription: {}, + workspace: null + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(res).toEqual('Process deleted: no workspace.'); + done(); + }); + }); + it('Should delete if sync is disabled', (done) => { PM2.mockImplementationOnce(() => ({ delete: jest.fn(), From ec392d3f253b5091068703c98a6f020333de94f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antoine=20de=20Chevign=C3=A9?= Date: Fri, 9 Jan 2026 16:59:40 +0100 Subject: [PATCH 3/3] Add syncRecoveryCheck to job queue priorities The syncRecoveryCheck job was registered in jobs/index.js and scheduled in scheduler.js, but was missing from the priorities list which caused a crash on startup when trying to enqueue to a non-existent queue. Co-Authored-By: Claude Opus 4.5 --- run/workers/priorities.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/run/workers/priorities.js b/run/workers/priorities.js index c34c1fa7..8df1e1f6 100644 --- a/run/workers/priorities.js +++ b/run/workers/priorities.js @@ -52,7 +52,8 @@ const priorities = { 'backfillNativeTokenTransfers', 'backfillOpBatchBlockRanges', 'backfillOpDeposits', - 'backfillOpOutputs' + 'backfillOpOutputs', + 'syncRecoveryCheck' ] }