Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/confirm-write-hook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@tanstack/offline-transactions": minor
---

Add an opt-in `OfflineConfig.confirmWrite` hook that holds optimistic state across the post-commit confirmation window **off** the serial drain path.

Previously the only way to keep a row painted until an async sync stream (e.g. ElectricSQL's `awaitTxId`) echoed the write back was to `await` that confirmation inside the `mutationFn` — which serializes the whole outbox and collapses drain throughput. `confirmWrite` runs after the write commits and its outbox entry is removed, while the library keeps the just-committed mutations' optimistic overlay painted (reusing the same hold primitive as `restoreOptimisticState`) and releases it when the hook settles. The serial chain still serializes the POSTs (preserving create-then-update ordering); only the confirmation moved off it.

The hook is never expected to roll back — the write is already durably committed, so a rejection just drops the overlay early (a possible brief flicker), never data loss. A `maxConfirmationHolds` cap (default 1000) bounds concurrent holds to avoid O(n²) optimistic recompute on a large, fast drain, and `getActiveConfirmationHoldCount()` exposes the live count for diagnostics.

As part of this, the `mutationFn`'s return value is now threaded through to the completion promise and to `confirmWrite` (e.g. a server-assigned txid); previously it was awaited and discarded.
17 changes: 17 additions & 0 deletions packages/offline-transactions/src/OfflineExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,17 @@ export class OfflineExecutor {
return this.executor.getRunningCount()
}

/**
* Number of optimistic holds currently kept alive by `confirmWrite` (see
* `OfflineConfig.confirmWrite`). Returns 0 when the hook is unused.
*/
getActiveConfirmationHoldCount(): number {
if (!this.executor) {
return 0
}
return this.executor.getActiveConfirmationHoldCount()
}

getOnlineDetector(): OnlineDetector {
return this.onlineDetector
}
Expand All @@ -596,6 +607,12 @@ export class OfflineExecutor {
}

dispose(): void {
// Drop any optimistic holds still waiting on confirmation so they don't
// outlive the executor.
if (this.executor) {
this.executor.releaseConfirmationHolds()
}

for (const collection of Object.values(this.config.collections)) {
collection.deferDataRefresh = null
}
Expand Down
98 changes: 98 additions & 0 deletions packages/offline-transactions/src/executor/OptimisticHold.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { createTransaction } from '@tanstack/db'
import type { Collection, PendingMutation, Transaction } from '@tanstack/db'

/**
* A standalone, never-committed transaction whose only job is to keep an
* optimistic overlay painted on the affected collections for a bounded window.
*
* This is the same primitive `restoreOptimisticState` uses to re-show pending
* writes after a reload. It is factored out here so the post-commit
* confirmation window (see `OfflineConfig.confirmWrite`) can reuse it without
* duplicating the `_state` bookkeeping.
*/
export interface OptimisticHold {
/** The underlying hold transaction. Never auto-commits. */
transaction: Transaction
/**
* Tear the hold down. Idempotent. By default marks the hold `completed` (the
* affected rows then render from synced data); pass `{ rollback: true }` to
* discard the optimistic overlay instead.
*/
release: (options?: { rollback?: boolean }) => void
}

/**
* Create an optimistic hold for `mutations` and register it on every touched
* collection synchronously (before returning), so the overlay is painted with
* no gap. The returned `release` removes it again.
*
* Mirrors the lifecycle the offline executor already drives for restoration
* transactions: `setState("completed")` + delete + `recomputeOptimisticState`
* on a normal release, or `rollback()` to discard.
*/
export function createOptimisticHold(
mutations: Array<PendingMutation>,
options: { id?: string } = {},
): OptimisticHold {
// `autoCommit: false` + an inert mutationFn means it never POSTs or settles on
// its own — the caller drives its lifecycle by hand via `release`.
const transaction = createTransaction({
...(options.id === undefined ? {} : { id: options.id }),
autoCommit: false,
mutationFn: async () => {},
})

// It never commits, so `isPersisted` never resolves through the normal flow;
// swallow so a stray rejection on teardown can't surface as an unhandled
// rejection. Mirrors `restoreOptimisticState`.
transaction.isPersisted.promise.catch(() => {
// Intentionally ignored - holds are torn down via `release`, not commit.
})

transaction.applyMutations(mutations)

// Register with each affected collection's state manager. Dedup by collection
// reference (the same collection can be touched by several mutations).
const touchedCollections = new Set<Collection<any, any, any, any, any>>()
for (const mutation of mutations) {
// Defensive check for corrupted deserialized data
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!mutation.collection) {
continue
}
if (touchedCollections.has(mutation.collection)) {
continue
}
touchedCollections.add(mutation.collection)
mutation.collection._state.transactions.set(transaction.id, transaction)
// `recomputeOptimisticState(true)` forces the recompute through even when a
// sync commit is in flight (the "triggered by user action" path), so the
// overlay always applies.
mutation.collection._state.recomputeOptimisticState(true)
}

let released = false
const release = ({ rollback = false }: { rollback?: boolean } = {}): void => {
if (released) {
return
}
released = true

if (rollback) {
// `rollback()` removes the transaction from its collections itself.
transaction.rollback()
return
}

// Mark completed so `recomputeOptimisticState` stops considering it; this
// also splices the tx out of @tanstack/db's global transaction registry, so
// the hold can never leak there. The rows now render from synced data.
transaction.setState(`completed`)
for (const collection of touchedCollections) {
collection._state.transactions.delete(transaction.id)
collection._state.recomputeOptimisticState(false)
}
}

return { transaction, release }
}
181 changes: 141 additions & 40 deletions packages/offline-transactions/src/executor/TransactionExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import { createTransaction } from '@tanstack/db'
import { DefaultRetryPolicy } from '../retry/RetryPolicy'
import { NonRetriableError } from '../types'
import { withNestedSpan } from '../telemetry/tracer'
import { createOptimisticHold } from './OptimisticHold'
import type { OptimisticHold } from './OptimisticHold'
import type { KeyScheduler } from './KeyScheduler'
import type { OutboxManager } from '../outbox/OutboxManager'
import type {
OfflineConfig,
OfflineTransaction,
TransactionSignaler,
} from '../types'
import type { PendingMutation } from '@tanstack/db'

const HANDLED_EXECUTION_ERROR = Symbol(`HandledExecutionError`)

// Default safety cap for `OfflineConfig.confirmWrite` holds. See the field's
// docs in types.ts: each hold adds O(transactions) recompute cost, so a large,
// fast drain is bounded to avoid O(n^2) churn.
const DEFAULT_MAX_CONFIRMATION_HOLDS = 1000

export class TransactionExecutor {
private scheduler: KeyScheduler
private outbox: OutboxManager
Expand All @@ -21,6 +28,10 @@ export class TransactionExecutor {
private executionPromise: Promise<void> | null = null
private offlineExecutor: TransactionSignaler
private retryTimer: ReturnType<typeof setTimeout> | null = null
// Optimistic holds kept alive across the post-commit confirmation window
// (see `OfflineConfig.confirmWrite`). Tracked so they can all be released on
// `clear()` (logout / outbox clear / company-switch).
private confirmationHolds = new Set<OptimisticHold>()

constructor(
scheduler: KeyScheduler,
Expand Down Expand Up @@ -104,7 +115,10 @@ export class TransactionExecutor {
await this.outbox.remove(transaction.id)

span.setAttribute(`result`, `success`)
this.offlineExecutor.resolveTransaction(transaction.id, result)
// Resolve the waiting transaction and, if `confirmWrite` is set, hold
// its optimistic state until confirmation completes — OFF this serial
// path, so it never blocks the next transaction below.
this.resolveWithOptionalConfirmation(transaction, result)
} catch (error) {
const err =
error instanceof Error ? error : new Error(String(error))
Expand All @@ -129,7 +143,9 @@ export class TransactionExecutor {
}
}

private async runMutationFn(transaction: OfflineTransaction): Promise<void> {
private async runMutationFn(
transaction: OfflineTransaction,
): Promise<unknown> {
const mutationFn = this.config.mutationFns[transaction.mutationFnName]

if (!mutationFn) {
Expand All @@ -150,12 +166,126 @@ export class TransactionExecutor {
metadata: transaction.metadata ?? {},
}

await mutationFn({
// Return the result so it can be surfaced to the waiting transaction and to
// `confirmWrite` (e.g. a server-assigned txid). Previously this value was
// awaited and discarded.
return await mutationFn({
transaction: transactionWithMutations as any,
idempotencyKey: transaction.idempotencyKey,
})
}

/**
* Resolve the waiting transaction, then — if `confirmWrite` is configured —
* keep its optimistic state painted until confirmation completes.
*
* The confirmation runs OFF the serial drain path: this method returns
* immediately so the executor can move on to the next transaction. The hold is
* created BEFORE `resolveTransaction` so the optimistic overlay is owned
* continuously (resolveTransaction drops the original/restoration transaction's
* overlay; the hold keeps the rows painted across that boundary, no flicker).
*/
private resolveWithOptionalConfirmation(
transaction: OfflineTransaction,
result: unknown,
): void {
const confirmWrite = this.config.confirmWrite

// No hook, or nothing to hold: behave exactly as before the hook existed.
if (!confirmWrite || transaction.mutations.length === 0) {
this.offlineExecutor.resolveTransaction(transaction.id, result)
return
}

const maxHolds =
this.config.maxConfirmationHolds ?? DEFAULT_MAX_CONFIRMATION_HOLDS
if (this.confirmationHolds.size >= maxHolds) {
// Safety valve: too many concurrent holds. Skip the hold — the optimistic
// overlay drops at resolve as it would without the hook. The write is
// already durably committed, so correctness is unaffected.
this.offlineExecutor.resolveTransaction(transaction.id, result)
return
}

const hold = this.createConfirmationHold(transaction.mutations)
this.offlineExecutor.resolveTransaction(transaction.id, result)

if (!hold) {
// Hold creation failed (already logged). The write is committed; just let
// the optimistic state drop as it did before the hook existed.
return
}

this.runConfirmation(confirmWrite, transaction, result, hold)
Comment on lines +200 to +219

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

confirmWrite is silently skipped when holds are capped or hold creation fails.

At Line 202 and Line 213, the function returns before invoking confirmWrite. That breaks the post-commit hook contract and makes behavior depend on hold availability instead of hook configuration. Also, Line 201 uses maxConfirmationHolds without normalizing NaN/negative values, which can bypass or disable the cap unexpectedly.

Suggested patch
-    const maxHolds =
-      this.config.maxConfirmationHolds ?? DEFAULT_MAX_CONFIRMATION_HOLDS
-    if (this.confirmationHolds.size >= maxHolds) {
-      // Safety valve: too many concurrent holds. Skip the hold — the optimistic
-      // overlay drops at resolve as it would without the hook. The write is
-      // already durably committed, so correctness is unaffected.
-      this.offlineExecutor.resolveTransaction(transaction.id, result)
-      return
-    }
-
-    const hold = this.createConfirmationHold(transaction.mutations)
+    const configuredMax = this.config.maxConfirmationHolds
+    const maxHolds =
+      Number.isFinite(configuredMax) && (configuredMax as number) >= 0
+        ? Math.floor(configuredMax as number)
+        : DEFAULT_MAX_CONFIRMATION_HOLDS
+    const hold =
+      this.confirmationHolds.size < maxHolds
+        ? this.createConfirmationHold(transaction.mutations)
+        : null
     this.offlineExecutor.resolveTransaction(transaction.id, result)
-
-    if (!hold) {
-      // Hold creation failed (already logged). The write is committed; just let
-      // the optimistic state drop as it did before the hook existed.
-      return
-    }
-
     this.runConfirmation(confirmWrite, transaction, result, hold)
   }
@@
-    hold: OptimisticHold,
+    hold: OptimisticHold | null,
   ): void {
     const release = (): void => {
-      this.confirmationHolds.delete(hold)
-      try {
-        hold.release()
-      } catch (error) {
-        console.warn(`Failed to release confirmation hold:`, error)
+      if (hold) {
+        this.confirmationHolds.delete(hold)
+        try {
+          hold.release()
+        } catch (error) {
+          console.warn(`Failed to release confirmation hold:`, error)
+        }
       }
     }

Also applies to: 237-270

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/offline-transactions/src/executor/TransactionExecutor.ts` around
lines 200 - 219, The confirmWrite callback is being silently skipped due to
early returns at lines 202 and 213 when holds are capped or hold creation fails,
which violates the post-commit hook contract. Restructure the logic in the
method body to ensure confirmWrite is always invoked, regardless of hold
availability. Additionally, the maxConfirmationHolds assignment does not
normalize NaN or negative config values, which can unexpectedly bypass or
disable the cap. Add validation to ensure maxConfirmationHolds is a positive
integer, using something like Math.max to ensure proper normalization before the
size check is performed.

}

// Never throws: a throw here would propagate into the serial drain and make
// the executor treat an already-committed write as failed.
private createConfirmationHold(
mutations: Array<PendingMutation>,
): OptimisticHold | null {
try {
const hold = createOptimisticHold(mutations)
this.confirmationHolds.add(hold)
return hold
} catch (error) {
console.warn(`Failed to create confirmation hold:`, error)
return null
}
}

private runConfirmation(
confirmWrite: NonNullable<OfflineConfig[`confirmWrite`]>,
transaction: OfflineTransaction,
result: unknown,
hold: OptimisticHold,
): void {
const release = (): void => {
this.confirmationHolds.delete(hold)
try {
hold.release()
} catch (error) {
console.warn(`Failed to release confirmation hold:`, error)
}
}

// Off the serial drain: `confirmWrite` must never block the next
// transaction, and a rejection must never surface as an unhandled rejection
// (the write already committed). Whatever happens, release exactly once.
void Promise.resolve()
.then(() =>
confirmWrite({
transactionId: transaction.id,
mutations: transaction.mutations,
result,
metadata: transaction.metadata,
}),
)
.catch((error) => {
// The write is durably committed; a failed confirmation only means we
// stop holding the optimistic overlay (a possible brief flicker).
console.warn(`confirmWrite rejected for ${transaction.id}:`, error)
})
.finally(release)
}

/** Release every active confirmation hold immediately. */
releaseConfirmationHolds(): void {
for (const hold of [...this.confirmationHolds]) {
this.confirmationHolds.delete(hold)
try {
hold.release()
} catch (error) {
console.warn(`Failed to release confirmation hold:`, error)
}
}
}

/** Diagnostics / tests: holds currently keeping optimistic state painted. */
getActiveConfirmationHoldCount(): number {
return this.confirmationHolds.size
}

private async handleError(
transaction: OfflineTransaction,
error: Error,
Expand Down Expand Up @@ -270,47 +400,17 @@ export class TransactionExecutor {
}

try {
// Create a restoration transaction that holds mutations for optimistic state display.
// It will never commit - the real mutation is handled by the offline executor.
const restorationTx = createTransaction({
// Hold the mutations for optimistic display while the write is pending.
// It will never commit - the real mutation is handled by the offline
// executor, which tears the hold down via cleanupRestorationTransaction
// (keyed by the offline transaction id) once the write resolves.
const hold = createOptimisticHold(offlineTx.mutations, {
id: offlineTx.id,
autoCommit: false,
mutationFn: async () => {},
})

// Prevent unhandled promise rejection when cleanup calls rollback()
// We don't care about this promise - it's just for holding mutations
restorationTx.isPersisted.promise.catch(() => {
// Intentionally ignored - restoration transactions are cleaned up
// via cleanupRestorationTransaction, not through normal commit flow
})

restorationTx.applyMutations(offlineTx.mutations)

// Register with each affected collection's state manager
const touchedCollections = new Set<string>()
for (const mutation of offlineTx.mutations) {
// Defensive check for corrupted deserialized data
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (!mutation.collection) {
continue
}
const collectionId = mutation.collection.id
if (touchedCollections.has(collectionId)) {
continue
}
touchedCollections.add(collectionId)

mutation.collection._state.transactions.set(
restorationTx.id,
restorationTx,
)
mutation.collection._state.recomputeOptimisticState(true)
}

this.offlineExecutor.registerRestorationTransaction(
offlineTx.id,
restorationTx,
hold.transaction,
)
} catch (error) {
console.warn(
Expand All @@ -324,6 +424,7 @@ export class TransactionExecutor {
clear(): void {
this.scheduler.clear()
this.clearRetryTimer()
this.releaseConfirmationHolds()
}

getPendingCount(): number {
Expand Down
Loading