Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions run/jobs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ module.exports = {
rpcHealthCheck: require('./rpcHealthCheck'),
rpcHealthCheckStarter: require('./rpcHealthCheckStarter'),
explorerSyncCheck: require('./explorerSyncCheck'),
syncRecoveryCheck: require('./syncRecoveryCheck'),
workspaceReset: require('./workspaceReset'),
batchBlockDelete: require('./batchBlockDelete'),
batchContractDelete: require('./batchContractDelete'),
Expand Down
113 changes: 113 additions & 0 deletions run/jobs/syncRecoveryCheck.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
* @fileoverview Sync recovery check job.
* Periodically checks auto-disabled explorers to see if their RPC has recovered.
* Re-enables sync when RPC becomes reachable, using exponential backoff for retries.
*
* Backoff schedule: 5m -> 15m -> 1h -> 6h (max)
* Max recovery attempts: 10 (after which manual intervention is required)
*
* @module jobs/syncRecoveryCheck
*/

const { Explorer, Workspace } = require('../models');
const { ProviderConnector } = require('../lib/rpc');
const { withTimeout } = require('../lib/utils');
const { Op } = require('sequelize');
const logger = require('../lib/logger');

// Process explorers in batches to avoid long-running jobs
const BATCH_SIZE = 50;

module.exports = async () => {
// Find explorers that are due for a recovery check (with batch limit)
// Excludes explorers that have reached max recovery attempts (nextRecoveryCheckAt is null)
const explorers = await Explorer.findAll({
where: {
syncDisabledReason: { [Op.ne]: null },
nextRecoveryCheckAt: { [Op.lte]: new Date() }
},
include: [{
model: Workspace,
as: 'workspace',
attributes: ['id', 'rpcServer']
}],
limit: BATCH_SIZE,
order: [['nextRecoveryCheckAt', 'ASC']] // Process oldest first
});

if (explorers.length === 0) {
return 'No explorers due for recovery check';
}

let recovered = 0;
let stillUnreachable = 0;
let maxAttemptsReached = 0;

for (const explorer of explorers) {
try {
// Skip if workspace is missing
if (!explorer.workspace || !explorer.workspace.rpcServer) {
logger.warn({
message: 'Explorer recovery check skipped: no workspace or RPC server',
explorerId: explorer.id,
explorerSlug: explorer.slug
});
continue;
}

const provider = new ProviderConnector(explorer.workspace.rpcServer);
const block = await withTimeout(provider.fetchLatestBlock());

if (block) {
// RPC is reachable - re-enable sync
await explorer.enableSyncAfterRecovery();
recovered++;

logger.info({
message: 'Explorer re-enabled after RPC recovery',
explorerId: explorer.id,
explorerSlug: explorer.slug,
disabledReason: explorer.syncDisabledReason,
recoveryAttempts: explorer.recoveryAttempts
});
} else {
// Block fetch returned null - still unreachable
const result = await explorer.scheduleNextRecoveryCheck();
if (result.maxReached) {
maxAttemptsReached++;
logger.warn({
message: 'Explorer reached max recovery attempts - manual intervention required',
explorerId: explorer.id,
explorerSlug: explorer.slug,
attempts: result.attempts
});
} else {
stillUnreachable++;
}
}
} catch (error) {
// RPC check failed - schedule next check with backoff
const result = await explorer.scheduleNextRecoveryCheck();
if (result.maxReached) {
maxAttemptsReached++;
logger.warn({
message: 'Explorer reached max recovery attempts - manual intervention required',
explorerId: explorer.id,
explorerSlug: explorer.slug,
attempts: result.attempts
});
} else {
stillUnreachable++;
}

logger.debug({
message: 'Explorer recovery check failed',
explorerId: explorer.id,
explorerSlug: explorer.slug,
error: error.message
});
}
}

return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable, ${maxAttemptsReached} max attempts reached`;
};
59 changes: 54 additions & 5 deletions run/jobs/updateExplorerSyncingProcess.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
/**
* @fileoverview Explorer sync process update job.
* Manages PM2 processes for explorer block syncing based on status.
* Auto-disables sync after repeated RPC failures with exponential backoff recovery.
* @module jobs/updateExplorerSyncingProcess
*/

const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } = require('../models');
const PM2 = require('../lib/pm2');
const logger = require('../lib/logger');

// Must match SYNC_FAILURE_THRESHOLD in explorer.js
const SYNC_FAILURE_THRESHOLD = 3;

module.exports = async job => {
const data = job.data;
Expand All @@ -19,17 +24,21 @@ module.exports = async job => {
{
model: Workspace,
as: 'workspace',
required: false,
include: {
model: RpcHealthCheck,
as: 'rpcHealthCheck'
as: 'rpcHealthCheck',
required: false
}
},
{
model: StripeSubscription,
as: 'stripeSubscription',
required: false,
include: {
model: StripePlan,
as: 'stripePlan'
as: 'stripePlan',
required: false
}
}
]
Expand All @@ -55,9 +64,25 @@ module.exports = async job => {
await pm2.delete(explorer.slug);
return 'Process deleted: no subscription.';
}
else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) {
else if (explorer && !explorer.workspace) {
await pm2.delete(explorer.slug);
return 'Process deleted: RPC is not reachable.';
return 'Process deleted: no workspace.';
}
else if (explorer.workspace && explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) {
await pm2.delete(explorer.slug);
// Track RPC failure and potentially auto-disable
const result = await explorer.incrementSyncFailures('rpc_unreachable');
if (result.disabled) {
logger.info({
message: 'Explorer auto-disabled due to RPC failures',
explorerId: explorer.id,
explorerSlug: explorer.slug,
attempts: result.attempts,
reason: 'rpc_unreachable'
});
return `Process deleted and sync auto-disabled after ${result.attempts} RPC failures.`;
}
return `Process deleted: RPC is not reachable (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD}).`;
}
else if (!explorer.shouldSync && existingProcess) {
await pm2.delete(explorer.slug);
Expand All @@ -69,17 +94,41 @@ module.exports = async job => {
}
else if (explorer.shouldSync && !existingProcess) {
await pm2.start(explorer.slug, explorer.workspaceId);
// Reset failure counter on successful start
if (explorer.syncFailedAttempts > 0) {
await explorer.update({ syncFailedAttempts: 0 });
}
return 'Process started.';
}
else if (explorer.shouldSync && existingProcess && existingProcess.pm2_env.status == 'stopped') {
await pm2.resume(explorer.slug, explorer.workspaceId);
// Reset failure counter on successful resume
if (explorer.syncFailedAttempts > 0) {
await explorer.update({ syncFailedAttempts: 0 });
}
return 'Process resumed.';
}
else
return 'No process change.';
} catch(error) {
if (error.message.startsWith('Timed out after'))
if (error.message.startsWith('Timed out after')) {
// Track timeout as a failure if explorer exists and sync is enabled
if (explorer && explorer.shouldSync) {
const result = await explorer.incrementSyncFailures('pm2_timeout');
if (result.disabled) {
logger.info({
message: 'Explorer auto-disabled due to PM2 timeouts',
explorerId: explorer.id,
explorerSlug: explorer.slug,
attempts: result.attempts,
reason: 'pm2_timeout'
});
return `Timed out and sync auto-disabled after ${result.attempts} failures.`;
}
return `Timed out (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})`;
}
return 'Timed out';
}
else
throw error;
}
Expand Down
66 changes: 66 additions & 0 deletions run/migrations/20260109161142-add-sync-failure-tracking.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
'use strict';

module.exports = {
async up (queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.addColumn('explorers', 'syncFailedAttempts', {
type: Sequelize.DataTypes.INTEGER,
allowNull: false,
defaultValue: 0
}, { transaction });

await queryInterface.addColumn('explorers', 'syncDisabledAt', {
type: Sequelize.DataTypes.DATE,
allowNull: true
}, { transaction });

await queryInterface.addColumn('explorers', 'syncDisabledReason', {
type: Sequelize.DataTypes.STRING,
allowNull: true
}, { transaction });

await queryInterface.addColumn('explorers', 'recoveryAttempts', {
type: Sequelize.DataTypes.INTEGER,
allowNull: false,
defaultValue: 0
}, { transaction });

await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', {
type: Sequelize.DataTypes.DATE,
allowNull: true
}, { transaction });

// Add index on nextRecoveryCheckAt for efficient recovery job queries
await queryInterface.addIndex('explorers', ['nextRecoveryCheckAt'], {
name: 'explorers_next_recovery_check_at_idx',
where: { nextRecoveryCheckAt: { [Sequelize.Op.ne]: null } },
transaction
});

await transaction.commit();
} catch(error) {
console.log(error);
await transaction.rollback();
throw error;
}
},

async down(queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.removeIndex('explorers', 'explorers_next_recovery_check_at_idx', { transaction });
await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction });
await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction });
await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction });
await queryInterface.removeColumn('explorers', 'recoveryAttempts', { transaction });
await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction });

await transaction.commit();
} catch(error) {
console.log(error);
await transaction.rollback();
throw error;
}
}
};
Loading