Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 293 additions & 1 deletion prisma/migrate.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PrismaClient } from '@prisma/client';
import { PrismaClient, Prisma } from '@prisma/client';
import * as path from 'path';
import * as fs from 'fs';
import * as readline from 'readline';
Expand Down Expand Up @@ -33,6 +33,7 @@ const DEFAULT_DATA_DIR = '/mnt/export/review_tables';
const DATA_DIR = process.env.DATA_DIR || DEFAULT_DATA_DIR;
const batchSize = 1000;
const logSize = 20000;
const submissionIsFileBatchSize = 500;
const DEFAULT_ES_DATA_FILE = path.join(
'/home/ubuntu',
'submissions-api.data.json',
Expand Down Expand Up @@ -64,6 +65,55 @@ if (shouldRepairMaps) {
);
}

const MIGRATION_TARGET_SUBMISSION_IS_FILE = 'submission-is-file';
const SUPPORTED_MIGRATION_TARGETS = new Set<string>([
MIGRATION_TARGET_SUBMISSION_IS_FILE,
]);
const MIGRATION_TARGET_ALIASES: Record<string, string> = {
[MIGRATION_TARGET_SUBMISSION_IS_FILE]: MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission-is-file-flag': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission-isfilesubmission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission:isfile': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'submission:isfilesubmission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
'is-file-submission': MIGRATION_TARGET_SUBMISSION_IS_FILE,
submission_is_file: MIGRATION_TARGET_SUBMISSION_IS_FILE,
};

const rawTargets = extractTargets(process.argv.slice(2));
const normalizedTargets: string[] = [];
const unknownTargets: string[] = [];

for (const target of rawTargets) {
const normalized = normalizeTarget(target);
if (normalized) {
normalizedTargets.push(normalized);
} else {
unknownTargets.push(target);
}
}

if (unknownTargets.length > 0) {
throw new Error(
`Unknown migration target(s): ${unknownTargets.join(', ')}. Supported targets: ${Array.from(SUPPORTED_MIGRATION_TARGETS).join(', ')}`,
);
}

const requestedTargets = new Set<string>(normalizedTargets);
const runFullMigration = requestedTargets.size === 0;
if (!runFullMigration && requestedTargets.size > 1) {
throw new Error(
`Multiple migration targets are not currently supported. Requested targets: ${Array.from(requestedTargets).join(', ')}`,
);
}
const runSubmissionIsFileTarget = requestedTargets.has(
MIGRATION_TARGET_SUBMISSION_IS_FILE,
);
if (!runFullMigration) {
console.log(
`Executing targeted migration: ${Array.from(requestedTargets).join(', ')}`,
);
}

const errorSummary = new Map<
string,
{ count: number; files: Set<string>; examples: string[] }
Expand Down Expand Up @@ -108,6 +158,73 @@ const shouldProcessRecord = (
);
};

const toBoolean = (value: unknown): boolean => {
if (typeof value === 'boolean') {
return value;
}
if (typeof value === 'number') {
if (value === 1) {
return true;
}
if (value === 0) {
return false;
}
}
if (typeof value === 'string') {
const normalized = value.trim().toLowerCase();
if (['true', '1', 'yes', 'y'].includes(normalized)) {
return true;
}
if (
['false', '0', 'no', 'n', 'null', 'undefined', ''].includes(normalized)
) {
return false;
}
}
return Boolean(value);
};

function extractTargets(args: string[]): string[] {
const results: string[] = [];
for (let i = 0; i < args.length; i += 1) {
const arg = args[i];
if (arg === '--target' || arg === '--targets') {
const value = args[i + 1];
if (!value) {
throw new Error(
`${arg} requires a value with comma-separated migration target names.`,
);
}
results.push(
...value
.split(',')
.map((item) => item.trim())
.filter((item) => item.length > 0),
);
i += 1;
continue;
}
const match = arg.match(/^--targets?=(.*)$/);
if (match) {
results.push(
...match[1]
.split(',')
.map((item) => item.trim())
.filter((item) => item.length > 0),
);
}
}
return results;
}

function normalizeTarget(target: string): string | undefined {
if (!target) {
return undefined;
}
const normalized = target.trim().toLowerCase();
return MIGRATION_TARGET_ALIASES[normalized];
}

const buildLogPrefix = (type: string, file: string, subtype?: string) =>
subtype ? `[${type}][${subtype}][${file}]` : `[${type}][${file}]`;

Expand Down Expand Up @@ -689,6 +806,7 @@ function convertSubmissionES(esData): any {
legacyChallengeId,
submissionPhaseId: String(esData.submissionPhaseId),
fileType: esData.fileType,
isFileSubmission: toBoolean(esData.isFileSubmission),
esId: esData.id,
submittedDate: esData.submittedDate ? new Date(esData.submittedDate) : null,
updatedBy: esData.updatedBy ?? null,
Expand Down Expand Up @@ -717,6 +835,171 @@ function convertSubmissionES(esData): any {
return submission;
}

async function migrateSubmissionIsFileFlagOnly() {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Using ElasticSearch export file: ${ES_DATA_FILE}`,
);
if (!fs.existsSync(ES_DATA_FILE)) {
throw new Error(
`ElasticSearch export file not found at ${ES_DATA_FILE}. Set ES_DATA_FILE to override the default.`,
);
}
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Starting isFileSubmission synchronization...`,
);

const pendingUpdates = new Map<string, boolean>();
const updatedLegacyIds = new Set<string>();
const missingExamples: string[] = [];

let totalUpdatedRows = 0;
let totalMissing = 0;
let processedRecords = 0;
let skippedMissingLegacyId = 0;
let skippedOutsideWindow = 0;
let parseErrors = 0;
let lineNumber = 0;

const flushPending = async () => {
if (pendingUpdates.size === 0) {
return;
}
const entries = Array.from(pendingUpdates.entries());
pendingUpdates.clear();
const valueTuples = entries.map(
([legacySubmissionId, isFileSubmission]) =>
Prisma.sql`(${legacySubmissionId}, ${isFileSubmission})`,
);
let updatedRows: Array<{ legacySubmissionId: string }> = [];
try {
updatedRows = await prisma.$queryRaw<
Array<{ legacySubmissionId: string }>
>(Prisma.sql`
WITH input("legacySubmissionId", "isFileSubmission") AS (
VALUES ${Prisma.join(valueTuples)}
)
UPDATE "submission" AS s
SET "isFileSubmission" = input."isFileSubmission"
FROM input
WHERE s."legacySubmissionId" = input."legacySubmissionId"
RETURNING s."legacySubmissionId" AS "legacySubmissionId"
`);
} catch (err) {
console.error(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Failed to update a batch of ${entries.length} submissions.`,
);
throw err;
}
totalUpdatedRows += updatedRows.length;
const batchUpdatedSet = new Set(
updatedRows.map((row) => String(row.legacySubmissionId)),
);
batchUpdatedSet.forEach((id) => updatedLegacyIds.add(id));
const batchMissing = entries.length - batchUpdatedSet.size;
totalMissing += batchMissing;
if (batchMissing > 0 && missingExamples.length < 5) {
for (const [legacyId] of entries) {
if (!batchUpdatedSet.has(legacyId)) {
missingExamples.push(legacyId);
if (missingExamples.length >= 5) {
break;
}
}
}
}
};

const fileStream = fs.createReadStream(ES_DATA_FILE);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

for await (const line of rl) {
lineNumber += 1;
if (!line) {
continue;
}
let parsed: any;
try {
parsed = JSON.parse(line);
} catch (err: any) {
parseErrors += 1;
if (parseErrors <= 5) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Failed to parse line ${lineNumber}: ${err?.message ?? err}`,
);
}
continue;
}
const source = parsed?._source;
if (!source || source.resource !== 'submission') {
continue;
}
if (source.legacySubmissionId == null) {
skippedMissingLegacyId += 1;
continue;
}
const createdAudit = source.created ?? source.submittedDate ?? null;
const updatedAudit = source.updated ?? null;
if (!shouldProcessRecord(createdAudit, updatedAudit)) {
skippedOutsideWindow += 1;
continue;
}
const legacySubmissionId = String(source.legacySubmissionId);
const isFileSubmission = toBoolean(source.isFileSubmission);
pendingUpdates.set(legacySubmissionId, isFileSubmission);
processedRecords += 1;
if (pendingUpdates.size >= submissionIsFileBatchSize) {
await flushPending();
}
if (processedRecords > 0 && processedRecords % logSize === 0) {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Queued ${processedRecords} submissions so far...`,
);
}
}

await flushPending();

const distinctUpdated = updatedLegacyIds.size;

console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Processed ${processedRecords} submission records.`,
);
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Updated ${totalUpdatedRows} row(s) affecting ${distinctUpdated} submission(s).`,
);
if (totalMissing > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Unable to locate ${totalMissing} submission(s) in the database.`,
);
if (missingExamples.length > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Missing legacySubmissionId examples: ${missingExamples.join(', ')}`,
);
}
}
if (skippedMissingLegacyId > 0) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Skipped ${skippedMissingLegacyId} record(s) without a legacySubmissionId.`,
);
}
if (skippedOutsideWindow > 0) {
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Skipped ${skippedOutsideWindow} record(s) outside the incremental window.`,
);
}
if (parseErrors > 5) {
console.warn(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] Encountered ${parseErrors} parse error(s) while reading the export.`,
);
}
console.log(
`[${MIGRATION_TARGET_SUBMISSION_IS_FILE}] isFileSubmission synchronization completed.`,
);
}

async function migrateElasticSearch() {
// migrate elastic search data
const filepath = ES_DATA_FILE;
Expand Down Expand Up @@ -3269,6 +3552,15 @@ async function migrateResourceSubmissions() {
}

async function migrate() {
if (!runFullMigration && runSubmissionIsFileTarget) {
await migrateSubmissionIsFileFlagOnly();
return;
}

if (!runFullMigration) {
throw new Error('No recognized migration targets were provided.');
}

if (!fs.existsSync(DATA_DIR)) {
throw new Error(
`DATA_DIR "${DATA_DIR}" does not exist. Set DATA_DIR to a valid export path.`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add isFileSubmission flag to submissions to differentiate file vs URL entries
ALTER TABLE "submission"
ADD COLUMN IF NOT EXISTS "isFileSubmission" BOOLEAN NOT NULL DEFAULT false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
Consider adding an index on the isFileSubmission column if it will be frequently queried or filtered on. This can improve query performance.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Improve review lookups that combine submission/phase filters with ordering
CREATE INDEX "review_submissionId_id_idx" ON "review"("submissionId", "id");
CREATE INDEX "review_phaseId_id_idx" ON "review"("phaseId", "id");
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Add a composite index to speed up paginated lookups by review item
CREATE INDEX IF NOT EXISTS "reviewItemComment_reviewItemId_sortOrder_id_idx"
ON "reviews"."reviewItemComment"("reviewItemId", "sortOrder", "id");
4 changes: 4 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ model review {

@@index([committed]) // Index for filtering by committed status
@@index([submissionId]) // Index for filtering by submission
@@index([submissionId, id]) // Helps ORDER BY id after filtering by submission
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
The new index @@index([submissionId, id]) is added to help with ORDER BY id after filtering by submissionId. Ensure that this index is necessary and beneficial for your query patterns, as additional indexes can increase write overhead and storage usage.

@@index([resourceId]) // Index for filtering by resource (reviewer)
@@index([phaseId]) // Index for filtering by phase
@@index([phaseId, id]) // Helps ORDER BY id after filtering by phase
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
The new index @@index([phaseId, id]) is added to help with ORDER BY id after filtering by phaseId. Verify that this index is required and will be utilized effectively, as it can lead to increased maintenance costs and storage requirements.

@@index([scorecardId]) // Index for joining with scorecard table
@@index([status]) // Index for filtering by review status
@@index([status, phaseId])
Expand Down Expand Up @@ -228,6 +230,7 @@ model reviewItemComment {
appeal appeal?

@@index([reviewItemId]) // Index for joining with reviewItem table
@@index([reviewItemId, sortOrder, id]) // Helps ordered pagination on review item comments
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ performance]
The new index @@index([reviewItemId, sortOrder, id]) is intended to assist with ordered pagination on review item comments. Ensure that the query patterns justify this index, as it can add overhead to write operations and consume additional storage.

@@index([id]) // Index for direct ID lookups
@@index([resourceId]) // Index for filtering by resource (commenter)
@@index([type]) // Index for filtering by comment type
Expand Down Expand Up @@ -397,6 +400,7 @@ model submission {
fileSize Int?
viewCount Int?
systemFileName String?
isFileSubmission Boolean @default(false)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[⚠️ design]
The addition of the isFileSubmission field with a default value of false should be reviewed to ensure it aligns with the existing data model and business logic. Consider potential impacts on existing queries and data integrity.

thurgoodJobId String?
virusScan Boolean @default(false)

Expand Down
Loading