From 96a054d937afbdf7d5298e48d64462617f322915 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 9 Jan 2026 13:45:34 -0700 Subject: [PATCH 1/5] fix(db): batch D2 output callbacks to prevent duplicate key errors in joins When D2's incremental join processes data, it can produce multiple outputs during a single graph.run() iteration - first a partial join result, then the full result plus a delete of the partial. Previously, each output callback had its own begin()/commit() cycle, causing the second insert for the same key to fail with DuplicateKeySyncError. This fix: - Accumulates all changes from output callbacks into a pendingChanges Map - Flushes accumulated changes in a single transaction after each graph.run() - Adds subscribedToAllCollections check to updateLiveQueryStatus() to ensure markReady() is only called after the graph has processed data - Properly types flushPendingChanges on SyncState to avoid type assertions Co-Authored-By: Claude Opus 4.5 --- packages/db/src/collection/sync.ts | 5 +- .../query/live/collection-config-builder.ts | 106 +++++++++++------- packages/db/src/query/live/types.ts | 4 +- .../tests/query/live-query-collection.test.ts | 14 ++- 4 files changed, 78 insertions(+), 51 deletions(-) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 841e76c1f..107062c96 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -134,10 +134,9 @@ export class CollectionSyncManager< !isTruncateTransaction ) { const existingValue = this.state.syncedData.get(key) - if ( - existingValue !== undefined && + const valuesEqual = existingValue !== undefined && deepEquals(existingValue, messageWithOptionalKey.value) - ) { + if (valuesEqual) { // The "insert" is an echo of a value we already have locally. // Treat it as an update so we preserve optimistic intent without // throwing a duplicate-key error during reconciliation. diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 59efff818..473960cb3 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -337,6 +337,10 @@ export class CollectionConfigBuilder< if (syncState.subscribedToAllCollections) { while (syncState.graph.pendingWork()) { syncState.graph.run() + // Flush accumulated changes after each graph step to commit them as one transaction. + // This ensures intermediate join states (like null on one side) don't cause + // duplicate key errors when the full join result arrives in the same step. + syncState.flushPendingChanges?.() callback?.() } @@ -345,10 +349,14 @@ export class CollectionConfigBuilder< if (syncState.messagesCount === 0) { begin() commit() - // After initial commit, check if we should mark ready - // (in case all sources were already ready before we subscribed) - this.updateLiveQueryStatus(this.currentSyncConfig) } + + // After graph processing completes, check if we should mark ready. + // This is the canonical place to transition to ready state because: + // 1. All data has been processed through the graph + // 2. All source collections have had a chance to send their initial data + // This prevents marking ready before data is processed (fixes isReady=true with empty data) + this.updateLiveQueryStatus(this.currentSyncConfig) } } finally { this.isGraphRunning = false @@ -687,22 +695,56 @@ export class CollectionConfigBuilder< const { begin, commit } = config const { graph, inputs, pipeline } = this.maybeCompileBasePipeline() + // Accumulator for changes across all output callbacks within a single graph run. + // This allows us to batch all changes from intermediate join states into a single + // transaction, avoiding duplicate key errors when joins produce multiple outputs + // for the same key (e.g., first output with null, then output with joined data). + let pendingChanges: Map> = new Map() + pipeline.pipe( output((data) => { const messages = data.getInner() syncState.messagesCount += messages.length - begin() - messages - .reduce( - accumulateChanges, - new Map>(), - ) - .forEach(this.applyChanges.bind(this, config)) - commit() + // Accumulate changes from this output callback into the pending changes map. + // Changes for the same key are merged (inserts/deletes are added together). + for (const [[key, tupleData], multiplicity] of messages as Array< + [[unknown, [TResult, string | undefined]], number] + >) { + const [value, orderByIndex] = tupleData + const existing = pendingChanges.get(key) || { + deletes: 0, + inserts: 0, + value, + orderByIndex, + } + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + // Update value to the latest version for this key + existing.value = value + if (orderByIndex !== undefined) { + existing.orderByIndex = orderByIndex + } + } + pendingChanges.set(key, existing) + } }), ) + // Flush pending changes and reset the accumulator. + // Called at the end of each graph run to commit all accumulated changes. + syncState.flushPendingChanges = () => { + if (pendingChanges.size === 0) { + return + } + begin() + pendingChanges.forEach(this.applyChanges.bind(this, config)) + commit() + pendingChanges = new Map() + } + graph.finalize() // Extend the sync state with the graph, inputs, and pipeline @@ -808,11 +850,14 @@ export class CollectionConfigBuilder< return } - // Mark ready when all source collections are ready AND - // the live query collection is not loading subset data. - // This prevents marking the live query ready before its data is loaded + // Mark ready when: + // 1. All subscriptions are set up (subscribedToAllCollections) + // 2. All source collections are ready + // 3. The live query collection is not loading subset data + // This prevents marking the live query ready before its data is processed // (fixes issue where useLiveQuery returns isReady=true with empty data) if ( + this.currentSyncState?.subscribedToAllCollections && this.allCollectionsReady() && !this.liveQueryCollection?.isLoadingSubset ) { @@ -913,8 +958,10 @@ export class CollectionConfigBuilder< // (graph only runs when all collections are subscribed) syncState.subscribedToAllCollections = true - // Initial status check after all subscriptions are set up - this.updateLiveQueryStatus(config) + // Note: We intentionally don't call updateLiveQueryStatus() here. + // The graph hasn't run yet, so marking ready would be premature. + // The canonical place to mark ready is after the graph processes data + // in maybeRunGraph(), which ensures data has been processed first. return loadSubsetDataCallbacks } @@ -1075,30 +1122,3 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } -function accumulateChanges( - acc: Map>, - [[key, tupleData], multiplicity]: [ - [unknown, [any, string | undefined]], - number, - ], -) { - // All queries now consistently return [value, orderByIndex] format - // where orderByIndex is undefined for queries without ORDER BY - const [value, orderByIndex] = tupleData as [T, string | undefined] - - const changes = acc.get(key) || { - deletes: 0, - inserts: 0, - value, - orderByIndex, - } - if (multiplicity < 0) { - changes.deletes += Math.abs(multiplicity) - } else if (multiplicity > 0) { - changes.inserts += multiplicity - changes.value = value - changes.orderByIndex = orderByIndex - } - acc.set(key, changes) - return acc -} diff --git a/packages/db/src/query/live/types.ts b/packages/db/src/query/live/types.ts index 64a51d7ab..04d0b389f 100644 --- a/packages/db/src/query/live/types.ts +++ b/packages/db/src/query/live/types.ts @@ -22,9 +22,11 @@ export type SyncState = { graph?: D2 inputs?: Record> pipeline?: ResultStream + flushPendingChanges?: () => void } -export type FullSyncState = Required +export type FullSyncState = Required> & + Pick /** * Configuration interface for live query collection options diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index 85866945c..58ed050ea 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -2293,10 +2293,11 @@ describe(`createLiveQueryCollection`, () => { .limit(10), ) - // Trigger sync which will call loadSubset - await liveQueryCollection.preload() + // Start preload (don't await yet - it won't resolve until loadSubset completes) + const preloadPromise = liveQueryCollection.preload() await flushPromises() + // Verify loadSubset was called with the correct options expect(capturedOptions.length).toBeGreaterThan(0) // Find the call that has orderBy (the limited snapshot request) @@ -2308,8 +2309,10 @@ describe(`createLiveQueryCollection`, () => { expect(callWithOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`) expect(callWithOrderBy?.limit).toBe(10) + // Resolve the loadSubset promise so preload can complete resolveLoadSubset!() await flushPromises() + await preloadPromise }) it(`passes multiple orderBy columns to loadSubset when using limit`, async () => { @@ -2350,10 +2353,11 @@ describe(`createLiveQueryCollection`, () => { .limit(10), ) - // Trigger sync which will call loadSubset - await liveQueryCollection.preload() + // Start preload (don't await yet - it won't resolve until loadSubset completes) + const preloadPromise = liveQueryCollection.preload() await flushPromises() + // Verify loadSubset was called with the correct options expect(capturedOptions.length).toBeGreaterThan(0) // Find the call that has orderBy with multiple columns @@ -2369,8 +2373,10 @@ describe(`createLiveQueryCollection`, () => { expect(callWithMultiOrderBy?.orderBy?.[1]?.expression.type).toBe(`ref`) expect(callWithMultiOrderBy?.limit).toBe(10) + // Resolve the loadSubset promise so preload can complete resolveLoadSubset!() await flushPromises() + await preloadPromise }) }) }) From 1dde0d28d0408f3653a0c189d60ae7bebc02879d Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jan 2026 20:46:41 +0000 Subject: [PATCH 2/5] ci: apply automated fixes --- packages/db/src/collection/sync.ts | 3 ++- packages/db/src/query/live/collection-config-builder.ts | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 107062c96..376ab58df 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -134,7 +134,8 @@ export class CollectionSyncManager< !isTruncateTransaction ) { const existingValue = this.state.syncedData.get(key) - const valuesEqual = existingValue !== undefined && + const valuesEqual = + existingValue !== undefined && deepEquals(existingValue, messageWithOptionalKey.value) if (valuesEqual) { // The "insert" is an echo of a value we already have locally. diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 473960cb3..4309ad829 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1121,4 +1121,3 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } - From 718cbc2a0d7b8af7953af20900e469bde68c9951 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 9 Jan 2026 13:50:09 -0700 Subject: [PATCH 3/5] chore: add changeset for live query join fix Co-Authored-By: Claude Opus 4.5 --- .changeset/fix-live-query-join-duplicates.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-live-query-join-duplicates.md diff --git a/.changeset/fix-live-query-join-duplicates.md b/.changeset/fix-live-query-join-duplicates.md new file mode 100644 index 000000000..f1cb4969e --- /dev/null +++ b/.changeset/fix-live-query-join-duplicates.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': patch +--- + +Fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts. From bcf96276fe8dae103e744941007ad4cfdae02749 Mon Sep 17 00:00:00 2001 From: Kyle Mathews Date: Fri, 9 Jan 2026 15:04:45 -0700 Subject: [PATCH 4/5] chore: update changeset to mention isReady fix Co-Authored-By: Claude Opus 4.5 --- .changeset/fix-live-query-join-duplicates.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.changeset/fix-live-query-join-duplicates.md b/.changeset/fix-live-query-join-duplicates.md index f1cb4969e..77a95d163 100644 --- a/.changeset/fix-live-query-join-duplicates.md +++ b/.changeset/fix-live-query-join-duplicates.md @@ -2,4 +2,6 @@ '@tanstack/db': patch --- -Fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts. +Fix `isReady()` returning `true` while `toArray()` returns empty results. The status now correctly waits until data has been processed through the graph before marking ready. + +Also fix duplicate key errors when live queries use joins with custom `getKey` functions. D2's incremental join can produce multiple outputs for the same key during a single graph run; this change batches all outputs into a single transaction to prevent conflicts. From 5ad89bc4bc66e4924235070f23c744dc84bf3037 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 12 Jan 2026 13:35:49 +0000 Subject: [PATCH 5/5] refactor: restore accumulateChanges helper function Restore the accumulateChanges helper function instead of inlining its logic. This improves code readability and maintainability by keeping the accumulation logic in a separate, testable function. Co-authored-by: Kevin --- .../query/live/collection-config-builder.ts | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 4309ad829..53f6d13a9 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -708,28 +708,7 @@ export class CollectionConfigBuilder< // Accumulate changes from this output callback into the pending changes map. // Changes for the same key are merged (inserts/deletes are added together). - for (const [[key, tupleData], multiplicity] of messages as Array< - [[unknown, [TResult, string | undefined]], number] - >) { - const [value, orderByIndex] = tupleData - const existing = pendingChanges.get(key) || { - deletes: 0, - inserts: 0, - value, - orderByIndex, - } - if (multiplicity < 0) { - existing.deletes += Math.abs(multiplicity) - } else if (multiplicity > 0) { - existing.inserts += multiplicity - // Update value to the latest version for this key - existing.value = value - if (orderByIndex !== undefined) { - existing.orderByIndex = orderByIndex - } - } - pendingChanges.set(key, existing) - } + messages.reduce(accumulateChanges, pendingChanges) }), ) @@ -1121,3 +1100,34 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } + +function accumulateChanges( + acc: Map>, + [[key, tupleData], multiplicity]: [ + [unknown, [any, string | undefined]], + number, + ], +) { + // All queries now consistently return [value, orderByIndex] format + // where orderByIndex is undefined for queries without ORDER BY + const [value, orderByIndex] = tupleData as [T, string | undefined] + + const changes = acc.get(key) || { + deletes: 0, + inserts: 0, + value, + orderByIndex, + } + if (multiplicity < 0) { + changes.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + changes.inserts += multiplicity + // Update value to the latest version for this key + changes.value = value + if (orderByIndex !== undefined) { + changes.orderByIndex = orderByIndex + } + } + acc.set(key, changes) + return acc +}