diff --git a/run/jobs/index.js b/run/jobs/index.js index 8184a313..7c6675d6 100644 --- a/run/jobs/index.js +++ b/run/jobs/index.js @@ -45,6 +45,7 @@ module.exports = { rpcHealthCheck: require('./rpcHealthCheck'), rpcHealthCheckStarter: require('./rpcHealthCheckStarter'), explorerSyncCheck: require('./explorerSyncCheck'), + syncRecoveryCheck: require('./syncRecoveryCheck'), workspaceReset: require('./workspaceReset'), batchBlockDelete: require('./batchBlockDelete'), batchContractDelete: require('./batchContractDelete'), diff --git a/run/jobs/syncRecoveryCheck.js b/run/jobs/syncRecoveryCheck.js new file mode 100644 index 00000000..27084dbf --- /dev/null +++ b/run/jobs/syncRecoveryCheck.js @@ -0,0 +1,113 @@ +/** + * @fileoverview Sync recovery check job. + * Periodically checks auto-disabled explorers to see if their RPC has recovered. + * Re-enables sync when RPC becomes reachable, using exponential backoff for retries. + * + * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) + * Max recovery attempts: 10 (after which manual intervention is required) + * + * @module jobs/syncRecoveryCheck + */ + +const { Explorer, Workspace } = require('../models'); +const { ProviderConnector } = require('../lib/rpc'); +const { withTimeout } = require('../lib/utils'); +const { Op } = require('sequelize'); +const logger = require('../lib/logger'); + +// Process explorers in batches to avoid long-running jobs +const BATCH_SIZE = 50; + +module.exports = async () => { + // Find explorers that are due for a recovery check (with batch limit) + // Excludes explorers that have reached max recovery attempts (nextRecoveryCheckAt is null) + const explorers = await Explorer.findAll({ + where: { + syncDisabledReason: { [Op.ne]: null }, + nextRecoveryCheckAt: { [Op.lte]: new Date() } + }, + include: [{ + model: Workspace, + as: 'workspace', + attributes: ['id', 'rpcServer'] + }], + limit: BATCH_SIZE, + order: [['nextRecoveryCheckAt', 'ASC']] // Process oldest first + }); + + if (explorers.length === 0) { + return 'No explorers due for recovery check'; + } + + let recovered = 0; + let stillUnreachable = 0; + let maxAttemptsReached = 0; + + for (const explorer of explorers) { + try { + // Skip if workspace is missing + if (!explorer.workspace || !explorer.workspace.rpcServer) { + logger.warn({ + message: 'Explorer recovery check skipped: no workspace or RPC server', + explorerId: explorer.id, + explorerSlug: explorer.slug + }); + continue; + } + + const provider = new ProviderConnector(explorer.workspace.rpcServer); + const block = await withTimeout(provider.fetchLatestBlock()); + + if (block) { + // RPC is reachable - re-enable sync + await explorer.enableSyncAfterRecovery(); + recovered++; + + logger.info({ + message: 'Explorer re-enabled after RPC recovery', + explorerId: explorer.id, + explorerSlug: explorer.slug, + disabledReason: explorer.syncDisabledReason, + recoveryAttempts: explorer.recoveryAttempts + }); + } else { + // Block fetch returned null - still unreachable + const result = await explorer.scheduleNextRecoveryCheck(); + if (result.maxReached) { + maxAttemptsReached++; + logger.warn({ + message: 'Explorer reached max recovery attempts - manual intervention required', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts + }); + } else { + stillUnreachable++; + } + } + } catch (error) { + // RPC check failed - schedule next check with backoff + const result = await explorer.scheduleNextRecoveryCheck(); + if (result.maxReached) { + maxAttemptsReached++; + logger.warn({ + message: 'Explorer reached max recovery attempts - manual intervention required', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts + }); + } else { + stillUnreachable++; + } + + logger.debug({ + message: 'Explorer recovery check failed', + explorerId: explorer.id, + explorerSlug: explorer.slug, + error: error.message + }); + } + } + + return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable, ${maxAttemptsReached} max attempts reached`; +}; diff --git a/run/jobs/updateExplorerSyncingProcess.js b/run/jobs/updateExplorerSyncingProcess.js index ad1d7a71..d2c36627 100644 --- a/run/jobs/updateExplorerSyncingProcess.js +++ b/run/jobs/updateExplorerSyncingProcess.js @@ -1,11 +1,16 @@ /** * @fileoverview Explorer sync process update job. * Manages PM2 processes for explorer block syncing based on status. + * Auto-disables sync after repeated RPC failures with exponential backoff recovery. * @module jobs/updateExplorerSyncingProcess */ const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } = require('../models'); const PM2 = require('../lib/pm2'); +const logger = require('../lib/logger'); + +// Must match SYNC_FAILURE_THRESHOLD in explorer.js +const SYNC_FAILURE_THRESHOLD = 3; module.exports = async job => { const data = job.data; @@ -19,17 +24,21 @@ module.exports = async job => { { model: Workspace, as: 'workspace', + required: false, include: { model: RpcHealthCheck, - as: 'rpcHealthCheck' + as: 'rpcHealthCheck', + required: false } }, { model: StripeSubscription, as: 'stripeSubscription', + required: false, include: { model: StripePlan, - as: 'stripePlan' + as: 'stripePlan', + required: false } } ] @@ -55,9 +64,25 @@ module.exports = async job => { await pm2.delete(explorer.slug); return 'Process deleted: no subscription.'; } - else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) { + else if (explorer && !explorer.workspace) { await pm2.delete(explorer.slug); - return 'Process deleted: RPC is not reachable.'; + return 'Process deleted: no workspace.'; + } + else if (explorer.workspace && explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) { + await pm2.delete(explorer.slug); + // Track RPC failure and potentially auto-disable + const result = await explorer.incrementSyncFailures('rpc_unreachable'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to RPC failures', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts, + reason: 'rpc_unreachable' + }); + return `Process deleted and sync auto-disabled after ${result.attempts} RPC failures.`; + } + return `Process deleted: RPC is not reachable (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD}).`; } else if (!explorer.shouldSync && existingProcess) { await pm2.delete(explorer.slug); @@ -69,17 +94,41 @@ module.exports = async job => { } else if (explorer.shouldSync && !existingProcess) { await pm2.start(explorer.slug, explorer.workspaceId); + // Reset failure counter on successful start + if (explorer.syncFailedAttempts > 0) { + await explorer.update({ syncFailedAttempts: 0 }); + } return 'Process started.'; } else if (explorer.shouldSync && existingProcess && existingProcess.pm2_env.status == 'stopped') { await pm2.resume(explorer.slug, explorer.workspaceId); + // Reset failure counter on successful resume + if (explorer.syncFailedAttempts > 0) { + await explorer.update({ syncFailedAttempts: 0 }); + } return 'Process resumed.'; } else return 'No process change.'; } catch(error) { - if (error.message.startsWith('Timed out after')) + if (error.message.startsWith('Timed out after')) { + // Track timeout as a failure if explorer exists and sync is enabled + if (explorer && explorer.shouldSync) { + const result = await explorer.incrementSyncFailures('pm2_timeout'); + if (result.disabled) { + logger.info({ + message: 'Explorer auto-disabled due to PM2 timeouts', + explorerId: explorer.id, + explorerSlug: explorer.slug, + attempts: result.attempts, + reason: 'pm2_timeout' + }); + return `Timed out and sync auto-disabled after ${result.attempts} failures.`; + } + return `Timed out (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})`; + } return 'Timed out'; + } else throw error; } diff --git a/run/migrations/20260109161142-add-sync-failure-tracking.js b/run/migrations/20260109161142-add-sync-failure-tracking.js new file mode 100644 index 00000000..3205c852 --- /dev/null +++ b/run/migrations/20260109161142-add-sync-failure-tracking.js @@ -0,0 +1,66 @@ +'use strict'; + +module.exports = { + async up (queryInterface, Sequelize) { + const transaction = await queryInterface.sequelize.transaction(); + try { + await queryInterface.addColumn('explorers', 'syncFailedAttempts', { + type: Sequelize.DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, { transaction }); + + await queryInterface.addColumn('explorers', 'syncDisabledAt', { + type: Sequelize.DataTypes.DATE, + allowNull: true + }, { transaction }); + + await queryInterface.addColumn('explorers', 'syncDisabledReason', { + type: Sequelize.DataTypes.STRING, + allowNull: true + }, { transaction }); + + await queryInterface.addColumn('explorers', 'recoveryAttempts', { + type: Sequelize.DataTypes.INTEGER, + allowNull: false, + defaultValue: 0 + }, { transaction }); + + await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', { + type: Sequelize.DataTypes.DATE, + allowNull: true + }, { transaction }); + + // Add index on nextRecoveryCheckAt for efficient recovery job queries + await queryInterface.addIndex('explorers', ['nextRecoveryCheckAt'], { + name: 'explorers_next_recovery_check_at_idx', + where: { nextRecoveryCheckAt: { [Sequelize.Op.ne]: null } }, + transaction + }); + + await transaction.commit(); + } catch(error) { + console.log(error); + await transaction.rollback(); + throw error; + } + }, + + async down(queryInterface, Sequelize) { + const transaction = await queryInterface.sequelize.transaction(); + try { + await queryInterface.removeIndex('explorers', 'explorers_next_recovery_check_at_idx', { transaction }); + await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction }); + await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction }); + await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction }); + await queryInterface.removeColumn('explorers', 'recoveryAttempts', { transaction }); + await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction }); + + await transaction.commit(); + } catch(error) { + console.log(error); + await transaction.rollback(); + throw error; + } + } +}; diff --git a/run/models/explorer.js b/run/models/explorer.js index 908686ff..86fe666d 100644 --- a/run/models/explorer.js +++ b/run/models/explorer.js @@ -31,6 +31,18 @@ const IUniswapV2Router02 = require('../lib/abis/IUniswapV2Router02.json'); const IUniswapV2Factory = require('../lib/abis/IUniswapV2Factory.json'); const MAX_RPC_ATTEMPTS = 3; +// Sync failure auto-disable configuration +const SYNC_FAILURE_THRESHOLD = 3; +const MAX_RECOVERY_ATTEMPTS = 10; +const RECOVERY_BACKOFF_SCHEDULE = [ + 5 * 60 * 1000, // 5 minutes + 15 * 60 * 1000, // 15 minutes + 60 * 60 * 1000, // 1 hour + 6 * 60 * 60 * 1000 // 6 hours (max) +]; +// Stagger recovery checks to avoid thundering herd (random jitter up to 2 minutes) +const RECOVERY_JITTER_MAX = 2 * 60 * 1000; + module.exports = (sequelize, DataTypes) => { class Explorer extends Model { /** @@ -299,10 +311,19 @@ module.exports = (sequelize, DataTypes) => { /** * Starts block synchronization for the explorer. + * Resets all failure tracking when manually enabling sync. * @returns {Promise} Updated explorer */ - startSync() { - return this.update({ shouldSync: true }); + async startSync() { + await this.update({ + shouldSync: true, + syncFailedAttempts: 0, + syncDisabledAt: null, + syncDisabledReason: null, + recoveryAttempts: 0, + nextRecoveryCheckAt: null + }); + return this; } /** @@ -313,6 +334,96 @@ module.exports = (sequelize, DataTypes) => { return this.update({ shouldSync: false }); } + /** + * Increments the sync failure counter and auto-disables if threshold reached. + * Uses atomic increment to avoid race conditions. + * @param {string} [reason='rpc_unreachable'] - Reason for the failure + * @returns {Promise<{disabled: boolean, attempts: number}>} Result with disable status + */ + async incrementSyncFailures(reason = 'rpc_unreachable') { + // Use atomic increment to avoid race conditions + await this.increment('syncFailedAttempts'); + await this.reload(); + + if (this.syncFailedAttempts >= SYNC_FAILURE_THRESHOLD) { + await this.autoDisableSync(reason); + return { disabled: true, attempts: this.syncFailedAttempts }; + } + return { disabled: false, attempts: this.syncFailedAttempts }; + } + + /** + * Auto-disables sync and schedules first recovery check. + * Adds random jitter to avoid thundering herd when many explorers are disabled at once. + * @param {string} reason - Reason for disabling (e.g., 'rpc_unreachable') + * @returns {Promise} Updated explorer + */ + async autoDisableSync(reason) { + // Add random jitter to stagger recovery checks + const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX); + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0] + jitter); + await this.update({ + shouldSync: false, + syncDisabledAt: new Date(), + syncDisabledReason: reason, + recoveryAttempts: 0, + nextRecoveryCheckAt: nextCheck + }); + return this; + } + + /** + * Schedules the next recovery check using exponential backoff. + * Increments recovery attempts and returns null if max attempts reached. + * Backoff schedule: 5m -> 15m -> 1h -> 6h (max) + * @returns {Promise<{scheduled: boolean, attempts: number, maxReached: boolean}>} Result + */ + async scheduleNextRecoveryCheck() { + if (!this.syncDisabledAt) { + return { scheduled: false, attempts: 0, maxReached: false }; + } + + const newAttempts = (this.recoveryAttempts || 0) + 1; + + // Check if max recovery attempts reached + if (newAttempts >= MAX_RECOVERY_ATTEMPTS) { + await this.update({ + recoveryAttempts: newAttempts, + nextRecoveryCheckAt: null, + syncDisabledReason: 'max_recovery_attempts_reached' + }); + return { scheduled: false, attempts: newAttempts, maxReached: true }; + } + + // Use recovery attempts as index, capped at max backoff + const backoffIndex = Math.min(newAttempts - 1, RECOVERY_BACKOFF_SCHEDULE.length - 1); + // Add random jitter to stagger recovery checks + const jitter = Math.floor(Math.random() * RECOVERY_JITTER_MAX); + const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex] + jitter); + + await this.update({ + recoveryAttempts: newAttempts, + nextRecoveryCheckAt: nextCheck + }); + return { scheduled: true, attempts: newAttempts, maxReached: false }; + } + + /** + * Re-enables sync after successful recovery check. + * @returns {Promise} Updated explorer + */ + async enableSyncAfterRecovery() { + await this.update({ + shouldSync: true, + syncFailedAttempts: 0, + syncDisabledAt: null, + syncDisabledReason: null, + recoveryAttempts: 0, + nextRecoveryCheckAt: null + }); + return this; + } + /** * Creates a Uniswap V2 compatible DEX for the explorer. * @param {string} routerAddress - DEX router contract address @@ -690,7 +801,12 @@ module.exports = (sequelize, DataTypes) => { shouldEnforceQuota: DataTypes.BOOLEAN, isDemo: DataTypes.BOOLEAN, gasAnalyticsEnabled: DataTypes.BOOLEAN, - displayTopAccounts: DataTypes.BOOLEAN + displayTopAccounts: DataTypes.BOOLEAN, + syncFailedAttempts: DataTypes.INTEGER, + syncDisabledAt: DataTypes.DATE, + syncDisabledReason: DataTypes.STRING, + recoveryAttempts: DataTypes.INTEGER, + nextRecoveryCheckAt: DataTypes.DATE }, { hooks: { afterCreate(explorer, options) { diff --git a/run/scheduler.js b/run/scheduler.js index ce1271f1..677b39ff 100644 --- a/run/scheduler.js +++ b/run/scheduler.js @@ -3,6 +3,7 @@ const { enqueue } = require('./lib/queue'); const INTEGRITY_CHECK_INTERVAL = 5 * 60 * 1000; const RPC_HEALTH_CHECK_INTERVAL = 5 * 60 * 1000; const SUBSCRIPTION_CHECK_INTERVAL = 5 * 60 * 1000; +const SYNC_RECOVERY_CHECK_INTERVAL = 5 * 60 * 1000; const QUEUE_MONITORING_INTERVAL = 60 * 1000; const CANCEL_DEMO_INTERVAL = 60 * 60 * 1000; const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000; @@ -48,6 +49,14 @@ const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000; { every: SUBSCRIPTION_CHECK_INTERVAL } ); + await enqueue( + 'syncRecoveryCheck', + 'syncRecoveryCheck', + {}, + 10, + { every: SYNC_RECOVERY_CHECK_INTERVAL } + ); + await enqueue( 'queueMonitoring', 'queueMonitoring', diff --git a/run/tests/jobs/syncRecoveryCheck.test.js b/run/tests/jobs/syncRecoveryCheck.test.js new file mode 100644 index 00000000..da8104db --- /dev/null +++ b/run/tests/jobs/syncRecoveryCheck.test.js @@ -0,0 +1,175 @@ +const { Explorer, Workspace } = require('../mocks/models'); +require('../mocks/lib/rpc'); +require('../mocks/lib/utils'); +require('../mocks/lib/logger'); + +const { ProviderConnector } = require('../../lib/rpc'); +const syncRecoveryCheck = require('../../jobs/syncRecoveryCheck'); + +beforeEach(() => jest.clearAllMocks()); + +describe('syncRecoveryCheck', () => { + it('Should return early if no explorers need recovery check', async () => { + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([]); + + const result = await syncRecoveryCheck(); + + expect(result).toEqual('No explorers due for recovery check'); + }); + + it('Should re-enable sync when RPC is reachable', async () => { + const mockEnableSyncAfterRecovery = jest.fn().mockResolvedValue({}); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + recoveryAttempts: 2, + enableSyncAfterRecovery: mockEnableSyncAfterRecovery, + scheduleNextRecoveryCheck: jest.fn(), + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue({ number: 100 }) + })); + + const result = await syncRecoveryCheck(); + + expect(mockEnableSyncAfterRecovery).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 1 recovered, 0 still unreachable, 0 max attempts reached'); + }); + + it('Should schedule next recovery check when RPC is still unreachable', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: true, attempts: 3, maxReached: false }); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue(null) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable, 0 max attempts reached'); + }); + + it('Should schedule next recovery check when RPC check throws error', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: true, attempts: 2, maxReached: false }); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockRejectedValue(new Error('Connection refused')) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 1 still unreachable, 0 max attempts reached'); + }); + + it('Should handle max recovery attempts reached', async () => { + const mockScheduleNextRecoveryCheck = jest.fn().mockResolvedValue({ scheduled: false, attempts: 10, maxReached: true }); + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck, + workspace: { + id: 1, + rpcServer: 'http://localhost:8545' + } + }]); + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockResolvedValue(null) + })); + + const result = await syncRecoveryCheck(); + + expect(mockScheduleNextRecoveryCheck).toHaveBeenCalled(); + expect(result).toEqual('Checked 1 explorers: 0 recovered, 0 still unreachable, 1 max attempts reached'); + }); + + it('Should skip explorers without workspace', async () => { + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([{ + id: 1, + slug: 'test-explorer', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: jest.fn(), + workspace: null + }]); + + const result = await syncRecoveryCheck(); + + expect(result).toEqual('Checked 1 explorers: 0 recovered, 0 still unreachable, 0 max attempts reached'); + }); + + it('Should handle multiple explorers with mixed results', async () => { + const mockEnableSyncAfterRecovery1 = jest.fn().mockResolvedValue({}); + const mockScheduleNextRecoveryCheck2 = jest.fn().mockResolvedValue({ scheduled: true, attempts: 5, maxReached: false }); + + jest.spyOn(Explorer, 'findAll').mockResolvedValueOnce([ + { + id: 1, + slug: 'explorer-1', + syncDisabledReason: 'rpc_unreachable', + recoveryAttempts: 3, + enableSyncAfterRecovery: mockEnableSyncAfterRecovery1, + scheduleNextRecoveryCheck: jest.fn(), + workspace: { + id: 1, + rpcServer: 'http://working-rpc:8545' + } + }, + { + id: 2, + slug: 'explorer-2', + syncDisabledReason: 'rpc_unreachable', + enableSyncAfterRecovery: jest.fn(), + scheduleNextRecoveryCheck: mockScheduleNextRecoveryCheck2, + workspace: { + id: 2, + rpcServer: 'http://broken-rpc:8545' + } + } + ]); + + let callCount = 0; + ProviderConnector.mockImplementation(() => ({ + fetchLatestBlock: jest.fn().mockImplementation(() => { + callCount++; + if (callCount === 1) { + return Promise.resolve({ number: 100 }); + } + return Promise.resolve(null); + }) + })); + + const result = await syncRecoveryCheck(); + + expect(mockEnableSyncAfterRecovery1).toHaveBeenCalled(); + expect(mockScheduleNextRecoveryCheck2).toHaveBeenCalled(); + expect(result).toEqual('Checked 2 explorers: 1 recovered, 1 still unreachable, 0 max attempts reached'); + }); +}); diff --git a/run/tests/jobs/updateExplorerSyncingProcess.test.js b/run/tests/jobs/updateExplorerSyncingProcess.test.js index 4b321fe1..9b73521e 100644 --- a/run/tests/jobs/updateExplorerSyncingProcess.test.js +++ b/run/tests/jobs/updateExplorerSyncingProcess.test.js @@ -1,5 +1,6 @@ require('../mocks/lib/queue'); require('../mocks/lib/pm2'); +require('../mocks/lib/logger'); const { Explorer } = require('../mocks/models'); const PM2 = require('../../lib/pm2'); @@ -8,15 +9,17 @@ const updateExplorerSyncingProcess = require('../../jobs/updateExplorerSyncingPr beforeEach(() => jest.clearAllMocks()); const hasReachedTransactionQuota = jest.fn().mockResolvedValue(false); +const incrementSyncFailures = jest.fn(); +const update = jest.fn(); describe('updateExplorerSyncingProcess', () => { - it('Should exit gracefully if timeout', (done) => { + it('Should exit gracefully if timeout when sync is disabled', (done) => { const reset = jest.fn(); PM2.mockImplementationOnce(() => ({ reset, find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) })); - jest.spyOn(Explorer, 'findOne').mockResolvedValue({ slug: 'slug', workspaceId: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ slug: 'slug', workspaceId: 1, shouldSync: false }); updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { @@ -25,6 +28,48 @@ describe('updateExplorerSyncingProcess', () => { }); }); + it('Should track timeout as failure when sync is enabled', (done) => { + PM2.mockImplementationOnce(() => ({ + find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ + id: 1, + slug: 'slug', + workspaceId: 1, + shouldSync: true, + incrementSyncFailures: mockIncrementSyncFailures + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('pm2_timeout'); + expect(res).toEqual('Timed out (attempt 1/3)'); + done(); + }); + }); + + it('Should auto-disable sync after 3 timeouts', (done) => { + PM2.mockImplementationOnce(() => ({ + find: jest.fn().mockRejectedValueOnce(new Error('Timed out after 10000ms.')) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValue({ + id: 1, + slug: 'slug', + workspaceId: 1, + shouldSync: true, + incrementSyncFailures: mockIncrementSyncFailures + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('pm2_timeout'); + expect(res).toEqual('Timed out and sync auto-disabled after 3 failures.'); + done(); + }); + }); + it('Should reset if flag is passed', (done) => { const reset = jest.fn(); PM2.mockImplementationOnce(() => ({ @@ -91,14 +136,45 @@ describe('updateExplorerSyncingProcess', () => { }); }); - it('Should delete if rpc is not reachable', (done) => { + it('Should delete if rpc is not reachable and increment failures', (done) => { + PM2.mockImplementationOnce(() => ({ + delete: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) + })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: false, attempts: 1 }); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', + stripeSubscription: {}, + hasReachedTransactionQuota, + incrementSyncFailures: mockIncrementSyncFailures, + workspace: { + rpcHealthCheck: { + isReachable: false + } + } + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(res).toEqual('Process deleted: RPC is not reachable (attempt 1/3).'); + done(); + }); + }); + + it('Should auto-disable sync after 3 RPC failures', (done) => { PM2.mockImplementationOnce(() => ({ delete: jest.fn(), find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) })); + const mockIncrementSyncFailures = jest.fn().mockResolvedValue({ disabled: true, attempts: 3 }); jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + id: 1, + slug: 'test-explorer', stripeSubscription: {}, hasReachedTransactionQuota, + incrementSyncFailures: mockIncrementSyncFailures, workspace: { rpcHealthCheck: { isReachable: false @@ -108,7 +184,8 @@ describe('updateExplorerSyncingProcess', () => { updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { - expect(res).toEqual('Process deleted: RPC is not reachable.'); + expect(mockIncrementSyncFailures).toHaveBeenCalledWith('rpc_unreachable'); + expect(res).toEqual('Process deleted and sync auto-disabled after 3 RPC failures.'); done(); }); }); @@ -133,6 +210,24 @@ describe('updateExplorerSyncingProcess', () => { }); }); + it('Should delete if no workspace', (done) => { + PM2.mockImplementationOnce(() => ({ + delete: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'online' }}}) + })); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + slug: 'test-explorer', + stripeSubscription: {}, + workspace: null + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(res).toEqual('Process deleted: no workspace.'); + done(); + }); + }); + it('Should delete if sync is disabled', (done) => { PM2.mockImplementationOnce(() => ({ delete: jest.fn(), @@ -180,6 +275,7 @@ describe('updateExplorerSyncingProcess', () => { hasReachedTransactionQuota, stripeSubscription: {}, shouldSync: true, + syncFailedAttempts: 0, workspace: {} }); @@ -190,6 +286,29 @@ describe('updateExplorerSyncingProcess', () => { }); }); + it('Should reset failure counter on successful start', (done) => { + PM2.mockImplementationOnce(() => ({ + start: jest.fn(), + find: jest.fn().mockResolvedValue({ data: null }) + })); + const mockUpdate = jest.fn(); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + hasReachedTransactionQuota, + stripeSubscription: {}, + shouldSync: true, + syncFailedAttempts: 2, + update: mockUpdate, + workspace: {} + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(mockUpdate).toHaveBeenCalledWith({ syncFailedAttempts: 0 }); + expect(res).toEqual('Process started.'); + done(); + }); + }); + it('Should resume process if it is stopped', (done) => { PM2.mockImplementationOnce(() => ({ resume: jest.fn(), @@ -199,11 +318,35 @@ describe('updateExplorerSyncingProcess', () => { hasReachedTransactionQuota, stripeSubscription: {}, shouldSync: true, + syncFailedAttempts: 0, + workspace: {} + }); + + updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) + .then(res => { + expect(res).toEqual('Process resumed.'); + done(); + }); + }); + + it('Should reset failure counter on successful resume', (done) => { + PM2.mockImplementationOnce(() => ({ + resume: jest.fn(), + find: jest.fn().mockResolvedValue({ data: { pm2_env: { status: 'stopped' }}}) + })); + const mockUpdate = jest.fn(); + jest.spyOn(Explorer, 'findOne').mockResolvedValueOnce({ + hasReachedTransactionQuota, + stripeSubscription: {}, + shouldSync: true, + syncFailedAttempts: 1, + update: mockUpdate, workspace: {} }); updateExplorerSyncingProcess({ data: { explorerSlug: 'explorer' }}) .then(res => { + expect(mockUpdate).toHaveBeenCalledWith({ syncFailedAttempts: 0 }); expect(res).toEqual('Process resumed.'); done(); }); diff --git a/run/tests/mocks/lib/rpc.js b/run/tests/mocks/lib/rpc.js index fad2ce0b..684eba8d 100644 --- a/run/tests/mocks/lib/rpc.js +++ b/run/tests/mocks/lib/rpc.js @@ -49,7 +49,9 @@ jest.mock('../../../lib/rpc', () => { fetchTransactionReceipt: jest.fn() .mockResolvedValue({ status: 1 - }) + }), + fetchLatestBlock: jest.fn() + .mockResolvedValue({ number: 100 }) })), ERC721Connector: jest.fn().mockImplementation(() => ({ fetchAndStoreAllTokens: jest.fn().mockResolvedValue(true), diff --git a/run/workers/priorities.js b/run/workers/priorities.js index c34c1fa7..8df1e1f6 100644 --- a/run/workers/priorities.js +++ b/run/workers/priorities.js @@ -52,7 +52,8 @@ const priorities = { 'backfillNativeTokenTransfers', 'backfillOpBatchBlockRanges', 'backfillOpDeposits', - 'backfillOpOutputs' + 'backfillOpOutputs', + 'syncRecoveryCheck' ] }