diff --git a/.changeset/confirm-write-hook.md b/.changeset/confirm-write-hook.md new file mode 100644 index 000000000..e4a9840be --- /dev/null +++ b/.changeset/confirm-write-hook.md @@ -0,0 +1,11 @@ +--- +"@tanstack/offline-transactions": minor +--- + +Add an opt-in `OfflineConfig.confirmWrite` hook that holds optimistic state across the post-commit confirmation window **off** the serial drain path. + +Previously the only way to keep a row painted until an async sync stream (e.g. ElectricSQL's `awaitTxId`) echoed the write back was to `await` that confirmation inside the `mutationFn` — which serializes the whole outbox and collapses drain throughput. `confirmWrite` runs after the write commits and its outbox entry is removed, while the library keeps the just-committed mutations' optimistic overlay painted (reusing the same hold primitive as `restoreOptimisticState`) and releases it when the hook settles. The serial chain still serializes the POSTs (preserving create-then-update ordering); only the confirmation moved off it. + +The hook is never expected to roll back — the write is already durably committed, so a rejection just drops the overlay early (a possible brief flicker), never data loss. A `maxConfirmationHolds` cap (default 1000) bounds concurrent holds to avoid O(n²) optimistic recompute on a large, fast drain, and `getActiveConfirmationHoldCount()` exposes the live count for diagnostics. + +As part of this, the `mutationFn`'s return value is now threaded through to the completion promise and to `confirmWrite` (e.g. a server-assigned txid); previously it was awaited and discarded. diff --git a/packages/offline-transactions/src/OfflineExecutor.ts b/packages/offline-transactions/src/OfflineExecutor.ts index a6140cfeb..2ac6680ae 100644 --- a/packages/offline-transactions/src/OfflineExecutor.ts +++ b/packages/offline-transactions/src/OfflineExecutor.ts @@ -587,6 +587,17 @@ export class OfflineExecutor { return this.executor.getRunningCount() } + /** + * Number of optimistic holds currently kept alive by `confirmWrite` (see + * `OfflineConfig.confirmWrite`). Returns 0 when the hook is unused. + */ + getActiveConfirmationHoldCount(): number { + if (!this.executor) { + return 0 + } + return this.executor.getActiveConfirmationHoldCount() + } + getOnlineDetector(): OnlineDetector { return this.onlineDetector } @@ -596,6 +607,12 @@ export class OfflineExecutor { } dispose(): void { + // Drop any optimistic holds still waiting on confirmation so they don't + // outlive the executor. + if (this.executor) { + this.executor.releaseConfirmationHolds() + } + for (const collection of Object.values(this.config.collections)) { collection.deferDataRefresh = null } diff --git a/packages/offline-transactions/src/executor/OptimisticHold.ts b/packages/offline-transactions/src/executor/OptimisticHold.ts new file mode 100644 index 000000000..b69d56e5f --- /dev/null +++ b/packages/offline-transactions/src/executor/OptimisticHold.ts @@ -0,0 +1,98 @@ +import { createTransaction } from '@tanstack/db' +import type { Collection, PendingMutation, Transaction } from '@tanstack/db' + +/** + * A standalone, never-committed transaction whose only job is to keep an + * optimistic overlay painted on the affected collections for a bounded window. + * + * This is the same primitive `restoreOptimisticState` uses to re-show pending + * writes after a reload. It is factored out here so the post-commit + * confirmation window (see `OfflineConfig.confirmWrite`) can reuse it without + * duplicating the `_state` bookkeeping. + */ +export interface OptimisticHold { + /** The underlying hold transaction. Never auto-commits. */ + transaction: Transaction + /** + * Tear the hold down. Idempotent. By default marks the hold `completed` (the + * affected rows then render from synced data); pass `{ rollback: true }` to + * discard the optimistic overlay instead. + */ + release: (options?: { rollback?: boolean }) => void +} + +/** + * Create an optimistic hold for `mutations` and register it on every touched + * collection synchronously (before returning), so the overlay is painted with + * no gap. The returned `release` removes it again. + * + * Mirrors the lifecycle the offline executor already drives for restoration + * transactions: `setState("completed")` + delete + `recomputeOptimisticState` + * on a normal release, or `rollback()` to discard. + */ +export function createOptimisticHold( + mutations: Array, + options: { id?: string } = {}, +): OptimisticHold { + // `autoCommit: false` + an inert mutationFn means it never POSTs or settles on + // its own — the caller drives its lifecycle by hand via `release`. + const transaction = createTransaction({ + ...(options.id === undefined ? {} : { id: options.id }), + autoCommit: false, + mutationFn: async () => {}, + }) + + // It never commits, so `isPersisted` never resolves through the normal flow; + // swallow so a stray rejection on teardown can't surface as an unhandled + // rejection. Mirrors `restoreOptimisticState`. + transaction.isPersisted.promise.catch(() => { + // Intentionally ignored - holds are torn down via `release`, not commit. + }) + + transaction.applyMutations(mutations) + + // Register with each affected collection's state manager. Dedup by collection + // reference (the same collection can be touched by several mutations). + const touchedCollections = new Set>() + for (const mutation of mutations) { + // Defensive check for corrupted deserialized data + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + if (!mutation.collection) { + continue + } + if (touchedCollections.has(mutation.collection)) { + continue + } + touchedCollections.add(mutation.collection) + mutation.collection._state.transactions.set(transaction.id, transaction) + // `recomputeOptimisticState(true)` forces the recompute through even when a + // sync commit is in flight (the "triggered by user action" path), so the + // overlay always applies. + mutation.collection._state.recomputeOptimisticState(true) + } + + let released = false + const release = ({ rollback = false }: { rollback?: boolean } = {}): void => { + if (released) { + return + } + released = true + + if (rollback) { + // `rollback()` removes the transaction from its collections itself. + transaction.rollback() + return + } + + // Mark completed so `recomputeOptimisticState` stops considering it; this + // also splices the tx out of @tanstack/db's global transaction registry, so + // the hold can never leak there. The rows now render from synced data. + transaction.setState(`completed`) + for (const collection of touchedCollections) { + collection._state.transactions.delete(transaction.id) + collection._state.recomputeOptimisticState(false) + } + } + + return { transaction, release } +} diff --git a/packages/offline-transactions/src/executor/TransactionExecutor.ts b/packages/offline-transactions/src/executor/TransactionExecutor.ts index 71328443f..f873f3999 100644 --- a/packages/offline-transactions/src/executor/TransactionExecutor.ts +++ b/packages/offline-transactions/src/executor/TransactionExecutor.ts @@ -1,7 +1,8 @@ -import { createTransaction } from '@tanstack/db' import { DefaultRetryPolicy } from '../retry/RetryPolicy' import { NonRetriableError } from '../types' import { withNestedSpan } from '../telemetry/tracer' +import { createOptimisticHold } from './OptimisticHold' +import type { OptimisticHold } from './OptimisticHold' import type { KeyScheduler } from './KeyScheduler' import type { OutboxManager } from '../outbox/OutboxManager' import type { @@ -9,9 +10,15 @@ import type { OfflineTransaction, TransactionSignaler, } from '../types' +import type { PendingMutation } from '@tanstack/db' const HANDLED_EXECUTION_ERROR = Symbol(`HandledExecutionError`) +// Default safety cap for `OfflineConfig.confirmWrite` holds. See the field's +// docs in types.ts: each hold adds O(transactions) recompute cost, so a large, +// fast drain is bounded to avoid O(n^2) churn. +const DEFAULT_MAX_CONFIRMATION_HOLDS = 1000 + export class TransactionExecutor { private scheduler: KeyScheduler private outbox: OutboxManager @@ -21,6 +28,10 @@ export class TransactionExecutor { private executionPromise: Promise | null = null private offlineExecutor: TransactionSignaler private retryTimer: ReturnType | null = null + // Optimistic holds kept alive across the post-commit confirmation window + // (see `OfflineConfig.confirmWrite`). Tracked so they can all be released on + // `clear()` (logout / outbox clear / company-switch). + private confirmationHolds = new Set() constructor( scheduler: KeyScheduler, @@ -104,7 +115,10 @@ export class TransactionExecutor { await this.outbox.remove(transaction.id) span.setAttribute(`result`, `success`) - this.offlineExecutor.resolveTransaction(transaction.id, result) + // Resolve the waiting transaction and, if `confirmWrite` is set, hold + // its optimistic state until confirmation completes — OFF this serial + // path, so it never blocks the next transaction below. + this.resolveWithOptionalConfirmation(transaction, result) } catch (error) { const err = error instanceof Error ? error : new Error(String(error)) @@ -129,7 +143,9 @@ export class TransactionExecutor { } } - private async runMutationFn(transaction: OfflineTransaction): Promise { + private async runMutationFn( + transaction: OfflineTransaction, + ): Promise { const mutationFn = this.config.mutationFns[transaction.mutationFnName] if (!mutationFn) { @@ -150,12 +166,126 @@ export class TransactionExecutor { metadata: transaction.metadata ?? {}, } - await mutationFn({ + // Return the result so it can be surfaced to the waiting transaction and to + // `confirmWrite` (e.g. a server-assigned txid). Previously this value was + // awaited and discarded. + return await mutationFn({ transaction: transactionWithMutations as any, idempotencyKey: transaction.idempotencyKey, }) } + /** + * Resolve the waiting transaction, then — if `confirmWrite` is configured — + * keep its optimistic state painted until confirmation completes. + * + * The confirmation runs OFF the serial drain path: this method returns + * immediately so the executor can move on to the next transaction. The hold is + * created BEFORE `resolveTransaction` so the optimistic overlay is owned + * continuously (resolveTransaction drops the original/restoration transaction's + * overlay; the hold keeps the rows painted across that boundary, no flicker). + */ + private resolveWithOptionalConfirmation( + transaction: OfflineTransaction, + result: unknown, + ): void { + const confirmWrite = this.config.confirmWrite + + // No hook, or nothing to hold: behave exactly as before the hook existed. + if (!confirmWrite || transaction.mutations.length === 0) { + this.offlineExecutor.resolveTransaction(transaction.id, result) + return + } + + const maxHolds = + this.config.maxConfirmationHolds ?? DEFAULT_MAX_CONFIRMATION_HOLDS + if (this.confirmationHolds.size >= maxHolds) { + // Safety valve: too many concurrent holds. Skip the hold — the optimistic + // overlay drops at resolve as it would without the hook. The write is + // already durably committed, so correctness is unaffected. + this.offlineExecutor.resolveTransaction(transaction.id, result) + return + } + + const hold = this.createConfirmationHold(transaction.mutations) + this.offlineExecutor.resolveTransaction(transaction.id, result) + + if (!hold) { + // Hold creation failed (already logged). The write is committed; just let + // the optimistic state drop as it did before the hook existed. + return + } + + this.runConfirmation(confirmWrite, transaction, result, hold) + } + + // Never throws: a throw here would propagate into the serial drain and make + // the executor treat an already-committed write as failed. + private createConfirmationHold( + mutations: Array, + ): OptimisticHold | null { + try { + const hold = createOptimisticHold(mutations) + this.confirmationHolds.add(hold) + return hold + } catch (error) { + console.warn(`Failed to create confirmation hold:`, error) + return null + } + } + + private runConfirmation( + confirmWrite: NonNullable, + transaction: OfflineTransaction, + result: unknown, + hold: OptimisticHold, + ): void { + const release = (): void => { + this.confirmationHolds.delete(hold) + try { + hold.release() + } catch (error) { + console.warn(`Failed to release confirmation hold:`, error) + } + } + + // Off the serial drain: `confirmWrite` must never block the next + // transaction, and a rejection must never surface as an unhandled rejection + // (the write already committed). Whatever happens, release exactly once. + void Promise.resolve() + .then(() => + confirmWrite({ + transactionId: transaction.id, + mutations: transaction.mutations, + result, + metadata: transaction.metadata, + }), + ) + .catch((error) => { + // The write is durably committed; a failed confirmation only means we + // stop holding the optimistic overlay (a possible brief flicker). + console.warn(`confirmWrite rejected for ${transaction.id}:`, error) + }) + .finally(release) + } + + /** Release every active confirmation hold immediately. */ + releaseConfirmationHolds(): void { + for (const hold of [...this.confirmationHolds]) { + this.confirmationHolds.delete(hold) + try { + hold.release() + } catch (error) { + console.warn(`Failed to release confirmation hold:`, error) + } + } + } + + /** Diagnostics / tests: holds currently keeping optimistic state painted. */ + getActiveConfirmationHoldCount(): number { + return this.confirmationHolds.size + } + private async handleError( transaction: OfflineTransaction, error: Error, @@ -270,47 +400,17 @@ export class TransactionExecutor { } try { - // Create a restoration transaction that holds mutations for optimistic state display. - // It will never commit - the real mutation is handled by the offline executor. - const restorationTx = createTransaction({ + // Hold the mutations for optimistic display while the write is pending. + // It will never commit - the real mutation is handled by the offline + // executor, which tears the hold down via cleanupRestorationTransaction + // (keyed by the offline transaction id) once the write resolves. + const hold = createOptimisticHold(offlineTx.mutations, { id: offlineTx.id, - autoCommit: false, - mutationFn: async () => {}, }) - // Prevent unhandled promise rejection when cleanup calls rollback() - // We don't care about this promise - it's just for holding mutations - restorationTx.isPersisted.promise.catch(() => { - // Intentionally ignored - restoration transactions are cleaned up - // via cleanupRestorationTransaction, not through normal commit flow - }) - - restorationTx.applyMutations(offlineTx.mutations) - - // Register with each affected collection's state manager - const touchedCollections = new Set() - for (const mutation of offlineTx.mutations) { - // Defensive check for corrupted deserialized data - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (!mutation.collection) { - continue - } - const collectionId = mutation.collection.id - if (touchedCollections.has(collectionId)) { - continue - } - touchedCollections.add(collectionId) - - mutation.collection._state.transactions.set( - restorationTx.id, - restorationTx, - ) - mutation.collection._state.recomputeOptimisticState(true) - } - this.offlineExecutor.registerRestorationTransaction( offlineTx.id, - restorationTx, + hold.transaction, ) } catch (error) { console.warn( @@ -324,6 +424,7 @@ export class TransactionExecutor { clear(): void { this.scheduler.clear() this.clearRetryTimer() + this.releaseConfirmationHolds() } getPendingCount(): number { diff --git a/packages/offline-transactions/src/types.ts b/packages/offline-transactions/src/types.ts index e18a287cb..3f10efe24 100644 --- a/packages/offline-transactions/src/types.ts +++ b/packages/offline-transactions/src/types.ts @@ -89,6 +89,20 @@ export interface StorageDiagnostic { error?: Error } +export interface ConfirmWriteContext { + /** Id of the offline transaction whose write just committed. */ + transactionId: string + /** + * The mutations that were committed. One optimistic overlay is held per + * touched collection until the hook settles. + */ + mutations: Array + /** Whatever the matching mutationFn resolved with (e.g. a server txid). */ + result: unknown + /** The transaction's metadata, if any was supplied when it was created. */ + metadata?: Record +} + export interface OfflineConfig { collections: Record> mutationFns: Record @@ -101,6 +115,33 @@ export interface OfflineConfig { onUnknownMutationFn?: (name: string, tx: OfflineTransaction) => void onLeadershipChange?: (isLeader: boolean) => void onStorageFailure?: (diagnostic: StorageDiagnostic) => void + /** + * Optional post-commit confirmation hook. Runs AFTER a transaction's + * mutationFn resolves and its outbox entry is removed, but OFF the serial + * drain path — it does NOT block the next transaction's mutationFn, so a slow + * confirmation never throttles drain throughput. + * + * While the returned promise is pending, the library keeps the just-committed + * mutations' optimistic state painted (via an internal hold transaction), then + * releases the hold when it settles (resolve OR reject). Use it to wait for an + * asynchronous sync stream to echo the write back — e.g. ElectricSQL's + * `awaitTxId` — so the affected rows don't flicker (disappear then reappear) + * in the gap between server commit and sync. + * + * The hook is never expected to roll back: the write is already durably + * committed server-side, so a rejection only means the optimistic overlay is + * dropped early (a possible brief flicker), never data loss. Implement any + * timeout / verify-by-state logic inside the hook and resolve when done. + */ + confirmWrite?: (context: ConfirmWriteContext) => Promise + /** + * Safety cap on simultaneously-held confirmation holds (see `confirmWrite`). + * Each hold adds one transaction to every touched collection's optimistic + * recompute, which is O(transactions). Beyond the cap the hold is skipped (the + * overlay drops at commit instead) to avoid O(n^2) churn on a large, fast + * drain. Defaults to 1000. + */ + maxConfirmationHolds?: number leaderElection?: LeaderElection /** * Custom online detector implementation. diff --git a/packages/offline-transactions/tests/confirm-write.test.ts b/packages/offline-transactions/tests/confirm-write.test.ts new file mode 100644 index 000000000..1f55da7a4 --- /dev/null +++ b/packages/offline-transactions/tests/confirm-write.test.ts @@ -0,0 +1,190 @@ +import { describe, expect, it } from 'vitest' +import { createTestOfflineEnvironment } from './harness' +import type { ConfirmWriteContext, OfflineConfig } from '../src/types' + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +function deferred(): { + promise: Promise + resolve: (value: T) => void + reject: (error: unknown) => void +} { + let resolve!: (value: T) => void + let reject!: (error: unknown) => void + const promise = new Promise((res, rej) => { + resolve = res + reject = rej + }) + return { promise, resolve, reject } +} + +/** + * A mutationFn that "commits on the server" but never feeds the row back into + * the collection's synced stream. This reproduces the real-world gap the + * `confirmWrite` hook targets: the write is durable server-side, but the sync + * stream hasn't echoed it back yet — so the optimistic overlay is the ONLY + * thing keeping the row visible. With the hook the row stays until the hook + * settles; without it the row vanishes the instant the transaction resolves. + */ +const committedButNotSynced = async () => ({ txid: 42 }) + +async function insertAndCommit( + env: ReturnType, + id: string, +) { + const offlineTx = env.executor.createOfflineTransaction({ + mutationFnName: env.mutationFnName, + autoCommit: false, + }) + offlineTx.mutate(() => { + env.collection.insert({ + id, + value: id, + completed: false, + updatedAt: new Date(), + }) + }) + await offlineTx.commit() + return offlineTx +} + +describe(`OfflineConfig.confirmWrite`, () => { + it(`holds optimistic state past commit until the hook settles, then releases`, async () => { + const gate = deferred() + const calls: Array = [] + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + config: { + confirmWrite: (context) => { + calls.push(context) + return gate.promise + }, + }, + }) + await env.waitForLeader() + + const offlineTx = await insertAndCommit(env, `item-1`) + await flushMicrotasks() + + // The server committed but the sync stream never delivered the row, so the + // ONLY thing keeping it visible is the confirmation hold. It must be there. + expect(env.executor.getActiveConfirmationHoldCount()).toBe(1) + expect(env.collection.get(`item-1`)?.value).toBe(`item-1`) + + // The hook received the committed mutations and the mutationFn's result. + expect(calls).toHaveLength(1) + expect(calls[0]!.transactionId).toBe(offlineTx.id) + expect(calls[0]!.mutations).toHaveLength(1) + expect(calls[0]!.result).toEqual({ txid: 42 }) + + // Settle the hook → hold released. With nothing in the synced stream, the + // optimistic row now drops (in production the sync stream would have it). + gate.resolve() + await flushMicrotasks() + + expect(env.executor.getActiveConfirmationHoldCount()).toBe(0) + expect(env.collection.get(`item-1`)).toBeUndefined() + + env.executor.dispose() + }) + + it(`does not block the serial drain: a hung hook still lets the next write POST`, async () => { + const never = deferred() // confirmWrite that never settles + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + config: { + confirmWrite: () => never.promise, + }, + }) + await env.waitForLeader() + + await insertAndCommit(env, `item-1`) + await insertAndCommit(env, `item-2`) + await flushMicrotasks() + + // Both mutationFns ran even though the first hook never settled — the + // confirmation runs OFF the serial path. Pre-fix, awaiting confirmation + // inline would have parked the queue on the first write. + expect(env.mutationCalls).toHaveLength(2) + expect(env.executor.getActiveConfirmationHoldCount()).toBe(2) + + env.executor.dispose() + }) + + it(`releases the hold even when the hook rejects (write is already committed)`, async () => { + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + config: { + confirmWrite: () => Promise.reject(new Error(`shape never confirmed`)), + }, + }) + await env.waitForLeader() + + await insertAndCommit(env, `item-1`) + await flushMicrotasks() + + // A rejection is not a rollback: the hold is released, not retried, and the + // drain is unaffected. + expect(env.executor.getActiveConfirmationHoldCount()).toBe(0) + expect(env.collection.get(`item-1`)).toBeUndefined() + + env.executor.dispose() + }) + + it(`without the hook, optimistic state drops at commit (the gap the hook fills)`, async () => { + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + }) + await env.waitForLeader() + + await insertAndCommit(env, `item-1`) + await flushMicrotasks() + + expect(env.executor.getActiveConfirmationHoldCount()).toBe(0) + expect(env.collection.get(`item-1`)).toBeUndefined() + + env.executor.dispose() + }) + + it(`skips the hold past maxConfirmationHolds (O(n^2) safety valve)`, async () => { + const gate = deferred() + const config: Partial = { + confirmWrite: () => gate.promise, + maxConfirmationHolds: 0, + } + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + config, + }) + await env.waitForLeader() + + await insertAndCommit(env, `item-1`) + await flushMicrotasks() + + // Cap is 0, so no hold is created — the write still succeeds, the overlay + // just drops at commit as it would without the hook. + expect(env.executor.getActiveConfirmationHoldCount()).toBe(0) + expect(env.collection.get(`item-1`)).toBeUndefined() + + gate.resolve() + env.executor.dispose() + }) + + it(`releases all holds on dispose`, async () => { + const never = deferred() + const env = createTestOfflineEnvironment({ + mutationFn: committedButNotSynced, + config: { + confirmWrite: () => never.promise, + }, + }) + await env.waitForLeader() + + await insertAndCommit(env, `item-1`) + await flushMicrotasks() + expect(env.executor.getActiveConfirmationHoldCount()).toBe(1) + + env.executor.dispose() + expect(env.executor.getActiveConfirmationHoldCount()).toBe(0) + }) +}) diff --git a/packages/offline-transactions/tests/offline-e2e.test.ts b/packages/offline-transactions/tests/offline-e2e.test.ts index 981315c26..d65a31e83 100644 --- a/packages/offline-transactions/tests/offline-e2e.test.ts +++ b/packages/offline-transactions/tests/offline-e2e.test.ts @@ -87,7 +87,10 @@ describe(`offline executor end-to-end`, () => { await offlineTx.commit() - await expect(waitPromise).resolves.toBeUndefined() + // waitForTransactionCompletion now resolves with the mutationFn's return + // value (previously this value was awaited and discarded, so it always + // resolved `undefined`). The default test mutationFn returns { ok, mutations }. + await expect(waitPromise).resolves.toMatchObject({ ok: true }) const outboxEntries = await env.executor.peekOutbox() expect(outboxEntries).toEqual([])