diff --git a/.changeset/fix-synced-data-writes.md b/.changeset/fix-synced-data-writes.md new file mode 100644 index 000000000..82aa30f77 --- /dev/null +++ b/.changeset/fix-synced-data-writes.md @@ -0,0 +1,6 @@ +--- +'@tanstack/db': patch +'@tanstack/query-db-collection': patch +--- + +Fix syncedData not updating when manual write operations (writeUpsert, writeInsert, etc.) are called after async operations in mutation handlers. Previously, the sync transaction would be blocked by the persisting user transaction, leaving syncedData stale until the next sync cycle. diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index b76580c19..b873610f6 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -25,6 +25,12 @@ interface PendingSyncedTransaction< upserts: Map deletes: Set } + /** + * When true, this transaction should be processed immediately even if there + * are persisting user transactions. Used by manual write operations (writeInsert, + * writeUpdate, writeDelete, writeUpsert) which need synchronous updates to syncedData. + */ + immediate?: boolean } export class CollectionStateManager< @@ -437,13 +443,17 @@ export class CollectionStateManager< committedSyncedTransactions, uncommittedSyncedTransactions, hasTruncateSync, + hasImmediateSync, } = this.pendingSyncedTransactions.reduce( (acc, t) => { if (t.committed) { acc.committedSyncedTransactions.push(t) - if (t.truncate === true) { + if (t.truncate) { acc.hasTruncateSync = true } + if (t.immediate) { + acc.hasImmediateSync = true + } } else { acc.uncommittedSyncedTransactions.push(t) } @@ -457,10 +467,21 @@ export class CollectionStateManager< PendingSyncedTransaction >, hasTruncateSync: false, + hasImmediateSync: false, }, ) - if (!hasPersistingTransaction || hasTruncateSync) { + // Process committed transactions if: + // 1. No persisting user transaction (normal sync flow), OR + // 2. There's a truncate operation (must be processed immediately), OR + // 3. There's an immediate transaction (manual writes must be processed synchronously) + // + // Note: When hasImmediateSync or hasTruncateSync is true, we process ALL committed + // sync transactions (not just the immediate/truncate ones). This is intentional for + // ordering correctness: if we only processed the immediate transaction, earlier + // non-immediate transactions would be applied later and could overwrite newer state. + // Processing all committed transactions together preserves causal ordering. + if (!hasPersistingTransaction || hasTruncateSync || hasImmediateSync) { // Set flag to prevent redundant optimistic state recalculations this.isCommittingSyncTransactions = true diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index 376ab58df..4392db1b8 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -88,11 +88,12 @@ export class CollectionSyncManager< const syncRes = normalizeSyncFnResult( this.config.sync.sync({ collection: this.collection, - begin: () => { + begin: (options?: { immediate?: boolean }) => { this.state.pendingSyncedTransactions.push({ committed: false, operations: [], deletedKeys: new Set(), + immediate: options?.immediate, }) }, write: ( diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index 29bfce622..51dbe2ddd 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -328,7 +328,12 @@ export interface SyncConfig< > { sync: (params: { collection: Collection - begin: () => void + /** + * Begin a new sync transaction. + * @param options.immediate - When true, the transaction will be processed immediately + * even if there are persisting user transactions. Used by manual write operations. + */ + begin: (options?: { immediate?: boolean }) => void write: (message: ChangeMessageOrDeleteKeyMessage) => void commit: () => void markReady: () => void diff --git a/packages/query-db-collection/src/manual-sync.ts b/packages/query-db-collection/src/manual-sync.ts index b772550e3..addf6808b 100644 --- a/packages/query-db-collection/src/manual-sync.ts +++ b/packages/query-db-collection/src/manual-sync.ts @@ -35,7 +35,12 @@ export interface SyncContext< queryClient: QueryClient queryKey: Array getKey: (item: TRow) => TKey - begin: () => void + /** + * Begin a new sync transaction. + * @param options.immediate - When true, the transaction will be processed immediately + * even if there are persisting user transactions. Used by manual write operations. + */ + begin: (options?: { immediate?: boolean }) => void write: (message: Omit, `key`>) => void commit: () => void /** @@ -144,7 +149,9 @@ export function performWriteOperations< const normalized = normalizeOperations(operations, ctx) validateOperations(normalized, ctx) - ctx.begin() + // Use immediate: true to ensure syncedData is updated synchronously, + // even when called from within a mutationFn with an active persisting transaction + ctx.begin({ immediate: true }) for (const op of normalized) { switch (op.type) { diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index a6d73cea1..3a2802f9c 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -2286,6 +2286,99 @@ describe(`QueryCollection`, () => { expect(todo?.id).not.toBe(clientId) }) + it(`should update syncedData immediately when writeUpsert is called after async API in onUpdate handler`, async () => { + // Reproduces bug where syncedData shows stale values when writeUpsert is called + // AFTER an async API call in a mutation handler. The async await causes the + // transaction to be added to state.transactions before writeUpsert runs, + // which means commitPendingTransactions() sees hasPersistingTransaction=true + // and would skip processing the sync transaction without the immediate flag. + const queryKey = [`writeUpsert-after-api-test`] + + type Brand = { + id: string + brandName: string + } + + const serverBrands: Array = [{ id: `123`, brandName: `A` }] + + const queryFn = vi.fn().mockImplementation(async () => { + return [...serverBrands] + }) + + // Track syncedData state immediately after writeUpsert + let syncedDataAfterWriteUpsert: Brand | undefined + let hasPersistingTransactionDuringWrite = false + + const collection = createCollection( + queryCollectionOptions({ + id: `writeUpsert-after-api-test`, + queryKey, + queryFn, + queryClient, + getKey: (item: Brand) => item.id, + startSync: true, + onUpdate: async ({ transaction }) => { + const updates = transaction.mutations.map((m) => m.modified) + + // Simulate async API call - THIS IS KEY! + // After this await, the transaction will be in state.transactions + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Check if there's now a persisting transaction + hasPersistingTransactionDuringWrite = Array.from( + collection._state.transactions.values(), + ).some((tx) => tx.state === `persisting`) + + // Update server state + for (const update of updates) { + const idx = serverBrands.findIndex((b) => b.id === update.id) + if (idx !== -1) { + serverBrands[idx] = { ...serverBrands[idx], ...update } + } + } + + // Write the server response back to syncedData + // Without the immediate flag, this would be blocked by the persisting transaction + collection.utils.writeBatch(() => { + for (const update of updates) { + collection.utils.writeUpsert(update) + } + }) + + // Check syncedData IMMEDIATELY after writeUpsert + syncedDataAfterWriteUpsert = collection._state.syncedData.get(`123`) + + return { refetch: false } + }, + }), + ) + + await vi.waitFor(() => { + expect(collection.status).toBe(`ready`) + }) + + // Verify initial state + expect(collection._state.syncedData.get(`123`)?.brandName).toBe(`A`) + + // Update brandName from A to B + collection.update(`123`, (draft) => { + draft.brandName = `B` + }) + + // Wait for mutation to complete + await flushPromises() + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Verify we had a persisting transaction during the write + expect(hasPersistingTransactionDuringWrite).toBe(true) + + // The CRITICAL assertion: syncedData should have been updated IMMEDIATELY after writeUpsert + // Without the fix, this would fail because commitPendingTransactions() would skip + // processing due to hasPersistingTransaction being true + expect(syncedDataAfterWriteUpsert).toBeDefined() + expect(syncedDataAfterWriteUpsert?.brandName).toBe(`B`) + }) + it(`should not rollback object field updates after server response with refetch: false`, async () => { const queryKey = [`object-field-update-test`]