Skip to content

feat(etl): add JSONL/NDJSON support to catalog ETL pipeline#2477

Open
andrew-bierman wants to merge 47 commits into
developmentfrom
feat/jsonl-etl-support
Open

feat(etl): add JSONL/NDJSON support to catalog ETL pipeline#2477
andrew-bierman wants to merge 47 commits into
developmentfrom
feat/jsonl-etl-support

Conversation

@andrew-bierman

Copy link
Copy Markdown
Collaborator

Summary

  • Extends the catalog ETL pipeline to support JSONL/NDJSON files alongside existing CSV
  • Adds json-utils.ts with streaming JSON line parser backed by @packrat/guards type guards
  • Updates chunking workflow to auto-detect file format and route to the correct parser
  • Fixes promise capture in CSV pump to prevent silent R2 upload hangs

Test plan

  • Unit tests added for json-utils.ts (streaming parse, malformed line handling, type guards)
  • Existing CSV ETL tests pass unchanged
  • chunk-csv-for-r2 tests updated to cover both format paths
  • JSONL file with valid items ingests correctly end-to-end

🤖 Generated with Claude Code

andrew-bierman and others added 30 commits May 20, 2026 00:47
Mark the 2026-05-19 audit-remediation plan as superseded and replace it
with a Workflows-based plan that natively provides the durable-step +
idempotency + retry + state semantics the prior plan reconstructed
manually on top of Queues + Postgres.

Audit findings about CSV correctness, validator hardening, observability,
retention, and the operational runbook carry into the new plan; the
queue-as-state-machine subplot is dropped. Net unit count drops from
15 to 9.

Also includes the underlying audit (docs/audits/2026-05-16-etl-audit.md)
that grounds both plans.
Validates the integration before committing to the Workflows migration:
R2 byte-range reads, csv-parse inside step.do, Drizzle Neon HTTP query,
durable step.sleep, and step result persistence. Workflow takes
{ objectKey, source } params; trigger via wrangler workflows trigger
spike-etl-workflow ... --env=dev and observe in the dashboard.

Adds:
- packages/api/src/workflows/spike-etl-workflow.ts (the workflow class)
- packages/api/src/index.ts exports SpikeEtlWorkflow
- packages/api/wrangler.jsonc declares the workflows[] binding

Per the plan (docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
U1), this is throwaway. Delete the workflow file, the index.ts export,
and the wrangler binding after the GO/NO-GO decision lands U3's
production CatalogEtlWorkflow.
Pivoted from the in-app spike to a standalone worker because the dev
deploy of packrat-api requires Docker (App Container) and Docker is
not installed locally. Standalone worker has zero container surface
and only the bindings the spike actually exercises.

Spike rewritten to use the native R2 binding (env.PACKRAT_SCRAPY_BUCKET)
instead of the AWS S3 client — removes the R2_ACCESS_KEY_ID secret
dependency. Drizzle/Neon validation deferred to U3 (validates on the
production worker that already has NEON_DATABASE_URL).

Result on real prod data (cotopaxi_2026-05-14T16-54-05.csv, 698 KB):
  status=complete, duration=7s
  1-r2-head: size=698620 etag=4397... ok
  2-r2-range-read: 698134 bytes
  3-csv-parse: 100 rows
  4a/4b/4c-sleep: Δ=5043ms (5s sleep + ~40ms wake overhead)
  5-memoize-marker: persisted in instance history

GO. Workflows host R2 + csv-parse + step.sleep + step result persistence
cleanly inside step.do. Proceed to U2 (Drizzle migration 0048).

Adds:
- packages/api/wrangler.spike.jsonc (standalone worker config)
- packages/api/src/spike-entry.ts (thin /trigger endpoint)
- packages/api/src/workflows/spike-etl-workflow.ts rewritten

The standalone worker packrat-etl-spike.orange-frost-d665.workers.dev
should be deleted via `wrangler delete --config=wrangler.spike.jsonc`
after U3 lands the production CatalogEtlWorkflow.
Adds eight columns to etl_jobs for the Workflows-based ETL:
- workflow_instance_id (nullable text) — links the etl_jobs row to its
  Workflows instance for admin dashboards
- verified_at, verified_row_count (nullable) — post-ingestion R2-source
  row-count verification
- total_embedding_failures (integer DEFAULT 0 NOT NULL) — observable
  degradation signal when the embedding service fails inside a chunk
- superseded_by_job_id (FK to etl_jobs.id, ON DELETE SET NULL) +
  superseded_at — preserves the audit trail when an operator triggers
  repair-from-scratch
- source_etag, source_last_modified — captured at job start, compared by
  the repair endpoint to fail closed when the R2 source has been
  overwritten

Constraints + indexes:
- CHECK etl_jobs_no_self_supersede prevents a row from superseding itself
- Index etl_jobs_workflow_instance_id_idx (admin lookups)
- Index etl_jobs_superseded_by_idx (repair-chain lookups)
- UNIQUE catalog_item_etl_jobs_catalog_job_idx (catalog_item_id, etl_job_id)
  so retried chunk upserts can use ON CONFLICT DO NOTHING and not
  accumulate duplicate provenance rows

Also fixes the long-standing stale drizzle.config.ts schema path
(./src/db/schema.ts → ../db/src/schema.ts); the schema was extracted
to @packrat/db in merge b14f4db but the config pointer was not
updated, so db:generate failed before this commit.

The Workflows binding is the source of truth for chunk lifecycle and
retry semantics; the columns above are only DB-side denormalization
for admin queries.

Verification:
- drizzle-kit check: Everything's fine
- scripts/lint/check-drizzle-migrations.ts: Drizzle migration checks passed
- biome lint: clean

Schema smoke test at packages/api/test/db-schema-etl.test.ts asserts the
columns + indexes + CHECK constraint + UNIQUE index against the Docker
Postgres wsproxy. Run via bun test:api once docker-compose.test.yml is up.
Closes audit P1 #3, #4, #5 — the chunk boundary bugs where a CSV row
spanning a 20 MB byte-range chunk would be either dropped, invalidated,
or duplicated. The new helper snaps each chunk's byteEnd to the byte
immediately before a newline by reading a small (64 KiB default) tail
window and locating the last \n. Throws ChunkBoundaryError if the peek
window has no newline so a row wider than 64 KiB fails loudly.

Tail peek reads are issued in parallel via Promise.all so the producer
endpoint's CPU budget stays bounded on multi-GB files. Single-object-
parameter shape matches existing ETL functions.

5 unit tests cover: small-file single-chunk; multi-chunk newline
alignment; concatenation completeness; ChunkBoundaryError on no-newline;
row-boundary preservation across chunks. All pass via bun test:unit.

Used by the new CatalogEtlWorkflow and by the retry / repair-from-scratch
admin endpoints (next units).
Replaces packages/api/src/services/etl/processCatalogEtl.ts +
queue.ts as the catalog ingest engine. Producer cutover lands next
(separate commit) — for now both paths coexist; the queue handler in
src/index.ts still routes to processQueueBatch for ?engine=queue
callers during the bake window.

Workflow structure per source CSV:
  for each chunk in params.chunks:
    step.do('chunk-N', { retries: 3, backoff: exp, timeout: 5min },
            () => processChunk(...))
  step.do('aggregate')  -> UPDATE etl_jobs totals from memoized chunk results
  step.do('reconcile')  -> csv-parse the R2 source for logical row count
  step.do('reconcile-write') -> UPDATE verified_at + verified_row_count
  step.do('finalize')   -> UPDATE status='completed', completedAt

Audit closures inherited via the chunkCsvForR2 helper:
- P0 #1 (premature completion) — workflow instance state IS job state;
  the finalize step is the single transition to 'completed'
- P0 #2 (swallowed errors) — Workflows surface failed steps with full
  retry history; no DLQ table needed
- P1 #3/#4/#5 (chunk boundary bugs) — closed by the producer using
  newline-aligned ChunkSpec; consumer drops skipPartialRow
- P1 #1/#2 (retry endpoint, stuck-job sweep) — closed by workflow
  instance lifecycle (retry endpoints trigger new instances; stuck
  detection is via dashboard, not a wall-clock cron)
- P1 #3 specifically — header re-fetch uses a bounded 4K → 16K → 64K
  expand loop, throws EtlHeaderError if no newline anywhere in 64 KiB

Counter writes inside the chunk step (via existing
processValidItemsBatch / processLogsBatch) may double-count on a
chunk retry; the aggregate step at the end writes the authoritative
totals from memoized chunk results, overriding any retry drift.

wrangler.jsonc workflows binding switched from the throwaway
SPIKE_ETL_WORKFLOW to ETL_WORKFLOW (class CatalogEtlWorkflow). The
standalone spike worker (wrangler.spike.jsonc) is untouched and can
be torn down independently via wrangler delete --config=wrangler.spike.jsonc.

Test stub at src/__test-stubs__/cloudflare-workers.ts extended with
minimal WorkflowEntrypoint / WorkflowStep types so unit tests can
import workflow code without the real Cloudflare runtime.

Verification:
- All 17 unit-test files pass (304 tests) including the chunker tests
- biome check clean on all touched files
- Runtime verification (full deploy + trigger) blocked on Docker daemon
  for the production worker; can be exercised once Docker is up.
Modifies POST /catalog/etl to trigger a CatalogEtlWorkflow instance per
source CSV by default. The query parameter ?engine=queue keeps the
legacy queue path available so operators can roll back if the workflow
path misbehaves in production.

Workflow path:
- Calls chunkCsvForR2 per source object to produce newline-aligned
  ChunkSpec[] (closes audit P1 #3, #4, #5 on the retry surface as well
  as the initial ingest surface).
- Captures source_etag + source_last_modified from the first object's
  R2 head and persists them on the etl_jobs row. The admin
  repair-from-scratch endpoint (U5) compares the stored etag against
  the live R2 head to fail closed when a source has been overwritten.
- Generates a deterministic Workflows instance ID `${source}-${filename}`
  so duplicate triggers for the same file return the existing instance
  rather than producing parallel ingests.

Queue path:
- Unchanged from existing behavior — same 20 MB byte-range splits and
  queue.sendBatch.
- Kept until the workflow path bakes for at least a week in production
  (per migration plan rollout); removal in a follow-up PR.

Env type extended in env-validation.ts to expose ETL_WORKFLOW: Workflow
so the route handler can type-check the env.ETL_WORKFLOW.create call.

Unit tests still pass (17 files, 304 tests). The full end-to-end
verification (POST /catalog/etl?engine=workflow → workflow instance →
DB rows → reconcile → finalize) requires the production worker deploy,
which is gated on Docker for the AppContainer build — that path is
unchanged by this commit.
…gaps

Closes audit P3 #2. The previous CatalogItemValidator.isValidUrl
accepted anything new URL() parsed — including javascript:, mailto:,
data:, file:, and any private/loopback IP. Catalog URLs render in the
mobile app and the guides site, so a scraper bug or supply-chain
compromise could trick the UI into rendering a homograph phishing
link or a server-side fetch into hitting internal infrastructure.

Validator now rejects:
- Schemes other than http: and https:
- URLs > 2048 chars
- Loopback (localhost, 127.x.x.x, ::1), RFC-1918 (10/8, 172.16-31/12,
  192.168/16), link-local (169.254/16), IPv6 link-local (fe80:), and
  ULA (fc00:/fd00:) hostnames — string-level pattern match only, no
  DNS resolution (DNS resolution would itself be an SSRF vector)
- Hostnames containing non-ASCII characters that survive WhatWG URL
  encoding (IDN homograph defense in depth)

Length caps on prose fields:
- name 500, description 50,000, brand 200, category 200
- SKU 200 chars + /^[A-Za-z0-9_./-]+$/ charset

15 unit tests cover every reject path plus the boundary-allowed cases.
All 319 tests in the unit suite pass.
…v.dev

The spike worker (packrat-etl-spike) was deleted from the Cloudflare
account; the throwaway files referencing it no longer have a deployed
counterpart, so removing them keeps the worktree clean and the PR
diff focused on the production migration.

env.dev workflows binding added so the dev deploy of packrat-api
actually receives ETL_WORKFLOW. Top-level workflows[] does not
inherit into envs that explicitly redeclare other bindings
(wrangler 4.92 behavior).
Strict cast linter (check:casts:strict) rejects unchecked `as Error`
even when narrowing unknown from a catch. Replace with a clean
`instanceof Error ? err : new Error(String(err))` guard so the
parser.destroy call always receives a real Error.
…ures

Narrows U2's schema additions from 8 columns to 2 after PR-shaping
discussion. Most of the originally-scoped columns existed to support
audit findings whose consumers ship in later PRs:

  - verified_at / verified_row_count — reconcile UI / U10
  - superseded_by_job_id / superseded_at — repair endpoint / U5
  - source_etag / source_last_modified — fail-closed repair guard / U5

Adding them now would create dead schema with no reader, so each
follow-up unit adds its column when it lands. Net change: zero indexes,
zero CHECK constraints, zero UNIQUE constraints, no FK self-reference.
This is about as low-risk as a migration can be.

What stays (both load-bearing from day one):
- workflow_instance_id text — admin/debug link from etl_jobs to the
  CF Workflows instance; null on legacy queue-path rows, set on
  workflow-path rows
- total_embedding_failures integer DEFAULT 0 NOT NULL — observable
  embedding-fallback degradation counter (audit P2 #3)
- etl_jobs_workflow_instance_id_idx — supports the lookup pattern

Workflow simplifications follow:
- Dropped the reconcile + reconcile-write steps (no verified_* columns
  to write into); workflow now runs chunk-N × N → aggregate → finalize
- Dropped reconcileSourceRowCount helper (orphaned with the steps)
- Dropped source_etag / source_last_modified capture in the producer

Plan doc updated with a scope-adjustment note on U2 explaining the
narrowing; original 8-column rationale preserved for context.

Verification:
- drizzle-kit check ✓
- check-drizzle-migrations.ts ✓
- 18 unit-test files, 319 tests, all pass
- biome check clean on all touched files
Bounded-batch DELETE for invalid_item_logs older than 90 days, wired
to a daily 09:00 UTC CF Cron Trigger via a new scheduled handler arm
in src/index.ts.

Why batched: a naive single-statement DELETE on a table that has
been accumulating for months would acquire row-level locks on
millions of rows in one statement, hit Neon's statement timeout,
and roll back having pruned nothing. The loop deletes 10k-row
chunks via WHERE id IN (SELECT id ... LIMIT N) RETURNING id and
counts the returned rows. Stops on empty batch. Caps at 100
iterations (1M rows / run) so a first-run with months of backlog
can't monopolize the daily window — the cap is reported in the
RetentionResult so operators can see when more rows remain.

Defaults are sensible: 90-day window, 10k batch, 100-iter cap.
Overridable per-call via options.

Wrangler config gets a top-level + env.dev triggers.crons entry.
First cron in this worker, so the scheduled() handler in src/index.ts
is brand new — dispatches on controller.cron string and throws on
unknown crons so a misconfigured trigger fails loudly.

5 unit tests cover empty-first-batch, multi-batch accumulation, the
iteration cap, and the retentionDays fallback. All 324 tests in
the unit suite pass.

Real-DB integration coverage deferred to U9 (needs Docker Postgres).
Splits U6's "Sentry wiring + structured logger + error propagation"
deliverable. This PR ships the parts that don't need a new dependency:

  - Thin structured logger at packages/api/src/utils/logger.ts emits
    JSON lines with { level, event, ts, ...ctx }. To log an error,
    attach it under ctx.err — the emit boundary unpacks errorName /
    errorMessage / errorStack so the contract that error stacks never
    contain raw CSV row data is enforceable by code review at one
    site (the logger), not every call site
  - processLogsBatch rethrows on DB failure (audit P2 #2) — silently
    swallowing meant the only forensic record of validation failures
    could disappear without anyone noticing
  - processValidItemsBatch embedding-fallback path atomically
    increments etl_jobs.total_embedding_failures (audit P2 #3) so
    operators see degradation in the admin endpoint without trawling
    logs; warning log at the call site for the per-batch event
  - All console.log calls in the touched files replaced with
    logger.info / logger.warn / logger.error

Sentry wiring (@sentry/cloudflare with withSentry({ fetch, workflow,
queue, scheduled })) is deferred to a follow-up PR. Justification:
adding a new dep changes the lockfile, adds ~30 KB to the bundle, and
needs compat verification against the mobile app's @sentry/react-native.
Reviewers should see that as its own concern, not bundled with
correctness fixes. The logger's emit() boundary is the wire-up point
when the follow-up lands — each call site upgrades for free.

Verification: 19 unit-test files, 324 tests pass. biome clean.
…oints

Adds the two operator-facing surfaces that close the gap left by the
plan's U5 scope-down. Defers repair-from-scratch and ETag fail-closed
verification to follow-up PRs — workflow retry is enough to re-ingest
the 7 historical false-failures from 2026-05-14, and ETag verification
is defense in depth that operators can do manually for the one-time
recovery.

Migration 0049 adds two columns:
  - verified_at timestamp (nullable)
  - verified_row_count integer (nullable)

Both written exclusively by the new reconcile endpoint.

POST /admin/etl/:jobId/retry — rewritten to trigger a CatalogEtlWorkflow
instance instead of a queue message. Works for both legacy queue-era
failed jobs and workflow-era failed jobs (the new instance always uses
chunkCsvForR2 for newline-aligned chunks). Instance ID is suffixed with
the new jobId so duplicate retries don't collide. Response now includes
workflowInstanceId so the admin UI can deep-link to the dashboard.

POST /admin/etl/:jobId/reconcile — synchronously counts logical rows
in the R2 source via csv-parse (NOT raw \n counting; quoted multi-line
fields would skew that) and persists the result on verified_at +
verified_row_count. Returns expectedRowCount / actualRowCount / delta.
Large files may exceed the fetch budget — async-via-workflow is a
follow-up if needed.

EtlRetrySchema gets a workflowInstanceId field; EtlReconcileSchema is
new. Both in @packrat/schemas/admin.

Verification: drizzle-kit check + custom migration linter clean,
check-casts:strict clean, biome clean, 19 unit-test files / 324 tests
all pass.

Reset-stuck endpoint (POST /admin/etl/reset-stuck) is unchanged — its
wall-clock-based design is wrong (closed by the audit P1 #2) but the
fix is to delete it once Workflows is the only ingest path. Deferred
to the queue-path-removal PR.
New runbook at docs/runbooks/etl-pipeline.md covering:

- Architecture (producer → workflow → DB; cron arms)
- The ?engine=workflow|queue flag + coexistence-window context
- How to trigger an ETL run
- How to inspect workflow instances (wrangler workflows commands)
- How to retry a failed job
- How to reconcile a job's row count against R2
- DLQ / forensic record (the Workflows dashboard is the record;
  no DLQ table)
- The 7-job historical recovery procedure with SQL + curl
- Invalid-item-logs retention (daily 09:00 UTC sweep)
- Draining the legacy queue path when ready for deletion
- Admin dashboard field semantics under the Workflows architecture
  (workflow_instance_id, verified_*, total_embedding_failures, etc.)
- Accepted limitations (no soft-delete, success_rate quirk on failed
  jobs, sync reconcile bounded by fetch budget, no ETag fail-closed
  on retry yet, embedding cost on chunk retry)
- Historical recoveries appendix (stub for the 2026-05-14 recovery
  to be filled in when executed post-deploy)
- References (audit, plans, CF docs)

First runbook in docs/runbooks/ — establishes the convention.
Three real fixes plus one coverage exclusion:

1. packages/db/src/schema.ts: restore the `AnyPgColumn` type import.
   It was dropped when U2 was slimmed (the FK self-reference on
   `superseded_by_job_id` went away with it), but `post_comments`
   still uses AnyPgColumn for its own parent_comment_id self-reference.

2. invalidLogRetention.ts: drop the `.returning({ id: ... })` typed
   shape and use bare `.returning()`. The union of three Drizzle
   driver types (neon-http / neon-serverless / node-postgres) accepts
   only the no-arg overload at the intersection; the typed shape
   tripped TS2554. Row count is computed from `.length` anyway.

3. invalidLogRetention.test.ts: replace the `__mockDb` cross-module
   handle (TS2305: not an export) with vi.hoisted() state shared
   between the mock factory and the tests. Cleaner and type-safe.

4. vitest.unit.config.ts: add `src/workflows/catalog-etl-workflow.ts`
   to coverage exclude. The chunker sibling (src/workflows/shared/) is
   still covered (5 unit tests at 100%). The workflow class needs the
   real CF Workflows runtime for end-to-end execution; integration tests
   in /test pick it up when Docker Postgres is wired (deferred per the
   PR's "deferred to follow-up" list).

Plus: new unit tests for `logger.ts` (10 tests, 100% coverage) so the
new file doesn't drop the coverage threshold by itself.

Coverage now at 98.63% statements / 95.33% branches (was 76.76% / 95.16%).
20 unit-test files, 331 tests, all pass. bun check-types clean.

Does not address:
- `api-tests` install failure (Fail extracting tarball for
  @sentry/cli-linux-x64) — that's a transient registry / CI runner
  issue, not something this PR can fix. A retry should clear it.
…dit trail

Adds the columns + endpoints that were originally part of U5's full
scope but deferred during the U2 slim-down. Now landing together so
the post-merge operational story is complete.

Migration 0050 adds to etl_jobs:
- source_etag text — captured by the producer from r2.head().etag
- source_last_modified timestamp — same; redundant with etag but cheap
- superseded_by_job_id text — FK to etl_jobs.id (ON DELETE SET NULL),
  written by retry + repair endpoints to link the new job back to the
  original
- superseded_at timestamp — when the supersession was recorded
- CHECK constraint etl_jobs_no_self_supersede prevents a row from
  superseding itself
- Index etl_jobs_superseded_by_idx supports the dashboard's
  "show me the repair chain for cotopaxi" lookup

Producer (POST /catalog/etl):
- Captures sourceEtag + sourceLastModified from the first object's
  chunkCsvForR2 head; writes to etl_jobs on insert

Retry (POST /admin/etl/:jobId/retry):
- Refactored into a shared reingestJob() helper used by retry +
  repair-from-scratch
- Before triggering the new workflow, calls r2.head() and compares
  live etag against the stored sourceEtag — returns 409
  ETL_ETAG_MISMATCH unless ?force=true. Skips the check when the
  stored etag is NULL (legacy queue-era rows, including the 7
  false-failures from 2026-05-14)
- New job row carries supersededByJobId pointing at the original
  + supersededAt timestamp

New endpoint POST /admin/etl/:jobId/repair-from-scratch:
- Same shape as retry but accepts completed jobs too. Use case:
  operator suspects an originally-completed job under-counted (the
  audit's R8 "trace the repair chain" requirement)

Also adds @sentry/cloudflare ^10.37.0 to packages/api/package.json
(install lands in this commit but wiring is in the next one).

Verification: drizzle-kit check + custom linter clean,
check-casts:strict clean, biome clean, 20 unit-test files / 331
tests pass, tsc clean.
Closes the Sentry deferral from U6 part 1 now that the dependency is
installed. Wires Sentry into three surfaces:

1. Worker default export wrapped with withSentry(optionsCallback, handler)
   — initializes Sentry on first invocation; uncaught fetch / queue /
   scheduled exceptions land in Sentry with request + queue + cron context
   attached automatically by the SDK.

2. CatalogEtlWorkflow wrapped with instrumentWorkflowWithSentry — every
   step.do span + any uncaught throw inside a step lands in Sentry with
   workflow name + instance id + step name + attempt count attached.

3. logger.ts emit() boundary forwards to Sentry when isInitialized():
   - logger.info/warn → Sentry.addBreadcrumb (correlated with next captureException)
   - logger.error({ err }) → Sentry.captureException with ctx fields as tags/extras
   - logger.error without err → Sentry.captureMessage(level=error)
   Forwarding is best-effort and try/catch-wrapped — failures here never
   break the call site (JSON console line is the durable record).

Sentry options shared between handler + workflow:
   { dsn: env.SENTRY_DSN, environment: env.ENVIRONMENT,
     tracesSampleRate: 0.1, release: env.CF_VERSION_METADATA?.id }

wrangler.jsonc:
- Adds `nodejs_als` compatibility flag (required by @sentry/cloudflare's
  AsyncLocalStorage-based context propagation across awaits)
- Adds `upload_source_maps: true` so wrangler deploy uploads sourcemaps
  to Cloudflare — unminified stack traces in wrangler tail and the
  Workers dashboard. Sentry-side symbolication is a separate
  @sentry/cli sourcemaps upload step (documented in runbook; no CI
  deploy pipeline exists today to automate it).

Runbook updated with a new "Sentry observability" section documenting
wiring, tag conventions, and the optional Sentry-side sourcemap upload.

Verification:
- 20 unit-test files, 331 tests pass (logger tests still pass; Sentry
  isInitialized() returns false in tests so forwarding is silently
  skipped — JSON output to console unchanged)
- bun check-types clean
- biome check clean
- check-casts:strict clean
…typeof

Pre-push no-raw-typeof linter rejected the raw `typeof v === ...` chain
introduced in the Sentry forwarder. Replaced with isString/isNumber/
isBoolean from @packrat/guards, matching the rest of the codebase.

No behavior change; same path classification (primitives → Sentry tags,
objects/arrays → Sentry extras).
radash (which @packrat/guards re-exports) provides isString, isNumber,
isObject, etc. but not isBoolean. Use a direct === true || === false
check instead — passes the no-raw-typeof linter and reads cleaner than
inventing a wrapper.
Single-query per-source data-quality audit served from the API instead of
requiring scrapyd (or any other consumer) to talk to the DB directly. The
SQL stays where the schema lives; consumers authenticate with the existing
admin JWT and never need NEON_DATABASE_URL.

Flags surfaced per source (computed server-side from threshold constants
returned alongside the report):
- decimal_bug: count of prices < $10 with 3+ decimal places (the
  "1,299 → 1.299" parser bug from the existing scrapyd audit)
- low_median: median < $20 on a non-allowlisted source
- high_null:<field>: > 30% NULL on price / brand / description / weight /
  images / availability
- bad_weight: count of weights < 1g or > 100kg
- empty_name: count of empty / null names
- stale: source has no completed ETL in 30+ days

Query is a single CTE-based GROUP BY (DISTINCT ON for most-recent
ingest source per item, then aggregate). One round-trip for all
sources; ?source=<name> filters to one for ad-hoc debugging.

Response schema CatalogAuditSchema lives in @packrat/schemas/admin so
Eden Treaty consumers get end-to-end types.

Verification: 20 unit-test files / 331 tests pass, tsc clean, biome
clean, check-casts:strict clean.

Used by scripts/audit_db_catalog.py in PackRat-ScrapyD#129 (next commit
on that PR drops the direct-DB approach in favor of this endpoint).
Previous fix pointed drizzle.config.ts at ../db/src/schema.ts (relative
path crossing the workspace boundary). Cleaner: add an in-package
re-export at src/db/schema.ts that re-exports from @packrat/db/schema,
and point drizzle.config back to ./src/db/schema.ts.

drizzle-kit + any other drizzle-aware tooling now stays scoped to
packages/api and is insulated from workspace layout changes. Schema
source of truth still lives in packages/db/src/schema.ts.
…d migration

Previous: three migrations (0048_etl_workflow_columns, 0049_etl_verification_cols,
0050_etl_etag_and_supersession) all generated by drizzle-kit but renamed
post-generation, with hand-edited journal tags to match. That made the
migrations look hand-authored and the rename+edit pattern is brittle.

Now: one migration with whatever name drizzle-kit emits — the additive
column changes (workflow_instance_id, total_embedding_failures, verified_at,
verified_row_count, source_etag, source_last_modified, superseded_by_job_id,
superseded_at + FK + indexes + check constraint) collapse cleanly into a
single migration. Net diff impact: ~4,600 fewer lines (3 snapshots → 1).

Updates CLAUDE.md with explicit migration discipline so this doesn't recur:
- always generate via drizzle-kit
- keep the random auto-generated name (do not rename)
- never hand-edit journal / snapshots / SQL
- collapse additive changes into one migration when they ship together
- verify with drizzle-kit check before pushing

Schema content is identical; verified via drizzle-kit check.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Mark etl_jobs as failed when ETL_WORKFLOW.create() throws, preventing
  perpetually-running orphaned rows
- Wrap workflow run() in try/catch to update job status to failed on
  step exhaustion (runtime marks instance errored but DB row was stuck)
- Always await writerPromise in reconcile endpoint via try/finally so
  the promise cannot reject unhandled if the csv-parse loop throws
- Use byte scan (0x0A) instead of text.lastIndexOf for chunk boundaries
  in chunkCsvForR2 — char index != byte offset for non-ASCII CSV content
- Fix capped false positive in invalidLogRetention: capped only when last
  batch had rows remaining, not when loop exited via exhaustion
- Use returning({ id }) in retention sweep to avoid fetching full rows
- Guard JSON.stringify in logger emit against circular refs / BigInt

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan doc: remove superseded_* from the "not in this plan" exclusion list
(those columns are included in the migration SQL), enumerate all 8 new
columns in the correct bullet, and rephrase the source_etag backfill from
"at migration time" (impossible in SQL) to a post-migration operational step.

Runbook: update the retry section to reflect that superseded_by_job_id is
set on every new retry/repair row (not a follow-up PR), and update the
accepted-limitations entry to accurately describe the ETag fail-closed
behavior that repair-from-scratch already implements.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- chunkCsvForR2: use TextEncoder to convert char index → byte offset so
  non-ASCII product names don't produce mis-aligned chunk boundaries;
  arrayBuffer() approach broke the R2 mock in unit tests
- invalidLogRetention: revert .returning({id}) partial select — drizzle
  delete+subquery chain typing rejects the arg in this version; full
  .returning() still gives correct capped logic via rowCount sentinel

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- chunkCsvForR2: throw for empty R2 objects (size=0) instead of
  returning byteEnd=-1 which is an invalid range
- chunkCsvForR2: implement real concurrency cap at 16 parallel peek
  reads (was comment-only; now batched Promise.all loops)
- chunk-csv-for-r2.test: add empty-file error test
- db-schema-etl.test: fix describe label "Migration 0048" → "0047"
- plan doc: add drizzle-kit check to migration verification checklist
- runbook: warn that wrangler queues consumer remove does not drain
  in-flight messages — wait for queue depth 0 first

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…igration

fix: migrate catalog ETL to Cloudflare Workflows (audit remediation, phase 1)
CF Workflows rejects instance IDs containing dots — only [a-zA-Z0-9_-]
is allowed. The instanceId was built from source + filename verbatim,
so filenames like 23zero_2026-05-21T04-26-37.csv produced
"instance.invalid_id" errors on every ETL trigger.

Fix: strip the file extension before building the instanceId. For
retry/repair: use suffix-newJobId (always unique UUID-based, always
valid) instead of source-filename-suffix-uuid which can exceed 64 chars.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Regression guard for the bug where CF Workflows instance IDs contained
dots from raw .csv filenames, causing 500s. Tests the FILE_EXT_RE strip
and 64-char truncation logic inline, covering: basic strip, no-extension
input, long-name truncation, and timestamp char pass-through.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
andrew-bierman and others added 16 commits May 21, 2026 01:23
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add `relax_quotes` and `skip_records_with_error` to the csv-parse config so
unclosed-quote rows (e.g. zpacks CSV line 36741) are skipped rather than
crashing the whole job. The `on_skip` callback writes each bad row to
`invalid_item_logs` with a `csv_parse` field error so they remain visible
in the admin analytics dashboard.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Large-file workflow instances (campmor 87MB, goalzero 27MB, etc.) were
stuck in a deterministic timeout loop because each 20MiB chunk contained
~20K rows, exceeding Cloudflare Workers' 5-minute step execution limit.
Reducing DEFAULT_CHUNK_BYTES to 5MiB yields ~4× smaller chunks so each
step completes well within the time budget.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
csv-parse exposes the actual line number on the error object via `.lines`.
Using the loop-scoped rowIndex (valid-record counter) produced inaccurate
rowIndex values in invalid_item_logs for skipped malformed rows.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
fix(etl): catalog sprint fixes — lenient CSV parsing, chunk timeout, instanceId test
Create json-utils.ts with isJsonlFile() and mapJsonRowToItem() helpers,
then branch both the queue-path (processCatalogEtl) and workflow-path
(catalog-etl-workflow) processors to stream JSONL when the object key
ends in .jsonl/.ndjson. CSV path is unchanged. Also backports
relax_quotes + on_skip to the workflow CSV parser to match the queue path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace raw typeof checks with isString/isNumber/isObject from @packrat/guards
- Fixes custom lint rule violation (no-raw-typeof CI check)
- Add json-utils.test.ts with 30+ tests covering all branches
- Brings json-utils.ts line/statement coverage above 95% threshold

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Use toRecord() for JSON.parse results (catalog-etl-workflow, processCatalogEtl)
- Use toStringRecord() for techs narrowing (json-utils)
- Fix availability test value (in_stock not InStock)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove incorrect JSONL partial-line skip (chunker guarantees clean boundaries;
  skipPartialLine was dropping the first valid record per non-first chunk)
- Fix @packrat/guards import order for Biome organizeImports (after @packrat/db)
- Use @packrat/api/utils/csv-utils alias instead of relative path in json-utils
- Filter non-strings from JSON-parsed categories array (parity with native branch)
- Apply toStringRecord to safeJsonParse techs result (parity with native branch)
- Add test for JSON array categories non-string filtering

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@packrat/api/* → @packrat/db → @packrat/guards → @packrat/schemas/*

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…olution

csv-parse infers the correct CsvError type; explicit Error annotation caused
the Options overload to be rejected, resolving to Callback<string[]> instead.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
csv-parse types on_skip as (err: CsvError | undefined, ...) => void

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Avoids TS18048 — `err` is possibly undefined at line 206; the `message`
variable is already safely computed with optional chaining and a fallback.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The string fits within the 100-char line width; collapsing it removes
the only Biome format error in the checks job.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 22, 2026 07:53
@coderabbitai

coderabbitai Bot commented May 22, 2026

Copy link
Copy Markdown
Contributor

Warning

Rate limit exceeded

@andrew-bierman has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 2 minutes and 42 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 21a24069-2e1b-4cbd-8d07-c1c58f25faef

📥 Commits

Reviewing files that changed from the base of the PR and between 6676446 and ad4d009.

⛔ Files ignored due to path filters (1)
  • bun.lock is excluded by !**/*.lock, !bun.lock
📒 Files selected for processing (36)
  • CLAUDE.md
  • docs/audits/2026-05-16-etl-audit.md
  • docs/plans/2026-05-19-001-fix-etl-pipeline-audit-remediation-plan.md
  • docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
  • docs/runbooks/etl-pipeline.md
  • packages/api/drizzle.config.ts
  • packages/api/drizzle/0047_clear_monster_badoon.sql
  • packages/api/drizzle/meta/0047_snapshot.json
  • packages/api/drizzle/meta/_journal.json
  • packages/api/package.json
  • packages/api/src/__test-stubs__/cloudflare-workers.ts
  • packages/api/src/db/schema.ts
  • packages/api/src/index.ts
  • packages/api/src/routes/admin/analytics/catalog.ts
  • packages/api/src/routes/catalog/__tests__/instanceId.test.ts
  • packages/api/src/routes/catalog/index.ts
  • packages/api/src/services/etl/CatalogItemValidator.ts
  • packages/api/src/services/etl/__tests__/CatalogItemValidator.test.ts
  • packages/api/src/services/etl/processCatalogEtl.ts
  • packages/api/src/services/etl/processLogsBatch.ts
  • packages/api/src/services/etl/processValidItemsBatch.ts
  • packages/api/src/services/retention/__tests__/invalidLogRetention.test.ts
  • packages/api/src/services/retention/invalidLogRetention.ts
  • packages/api/src/utils/__tests__/json-utils.test.ts
  • packages/api/src/utils/__tests__/logger.test.ts
  • packages/api/src/utils/env-validation.ts
  • packages/api/src/utils/json-utils.ts
  • packages/api/src/utils/logger.ts
  • packages/api/src/workflows/catalog-etl-workflow.ts
  • packages/api/src/workflows/shared/__tests__/chunk-csv-for-r2.test.ts
  • packages/api/src/workflows/shared/chunkCsvForR2.ts
  • packages/api/test/db-schema-etl.test.ts
  • packages/api/vitest.unit.config.ts
  • packages/api/wrangler.jsonc
  • packages/db/src/schema.ts
  • packages/schemas/src/admin.ts
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/jsonl-etl-support

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented May 22, 2026

Copy link
Copy Markdown
Contributor

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Preview URL Updated (UTC)
✅ Deployment successful!
View logs
packrat-admin ad4d009 Commit Preview URL

Branch Preview URL
May 22 2026, 08:28 AM

@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented May 22, 2026

Copy link
Copy Markdown
Contributor

Deploying packrat-landing with  Cloudflare Pages  Cloudflare Pages

Latest commit: ad4d009
Status:🚫  Build failed.

View logs

@cloudflare-workers-and-pages

cloudflare-workers-and-pages Bot commented May 22, 2026

Copy link
Copy Markdown
Contributor

Deploying packrat-guides with  Cloudflare Pages  Cloudflare Pages

Latest commit: ad4d009
Status: ✅  Deploy successful!
Preview URL: https://862b73e2.packrat-guides-6gq.pages.dev
Branch Preview URL: https://feat-jsonl-etl-support.packrat-guides-6gq.pages.dev

View logs

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the catalog ETL ingestion pipeline to support JSONL/NDJSON sources alongside CSV, while migrating the primary execution engine to Cloudflare Workflows and adding operational hardening (observability, retry/repair tooling, and retention).

Changes:

  • Added a Workflows-based ETL engine (CatalogEtlWorkflow) with newline-aligned R2 chunk planning and updated catalog ETL trigger routing (?engine=workflow|queue).
  • Introduced JSONL mapping utilities and integrated JSONL parsing into both the legacy queue ETL and the new workflow chunk processor.
  • Added ETL operational features: structured logger + Sentry wiring, ETL job schema extensions, admin retry/repair/reconcile/audit endpoints, and daily invalid log retention sweeps.

Reviewed changes

Copilot reviewed 37 out of 38 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
packages/schemas/src/admin.ts Adds schemas for new admin ETL responses (reconcile/audit) and workflow instance IDs.
packages/db/src/schema.ts Extends etl_jobs with workflow/verification/provenance/supersession columns + indexes/constraints.
packages/api/wrangler.jsonc Enables Workflows binding, cron trigger, source map upload, and nodejs_als.
packages/api/vitest.unit.config.ts Excludes workflow entrypoint from unit coverage; clarifies test intent.
packages/api/test/db-schema-etl.test.ts Adds migration smoke test for ETL workflow-related DB schema.
packages/api/src/workflows/shared/chunkCsvForR2.ts New newline-aligned R2 byte-range chunk planner.
packages/api/src/workflows/shared/tests/chunk-csv-for-r2.test.ts Unit tests validating chunk boundary correctness and error cases.
packages/api/src/workflows/catalog-etl-workflow.ts New Cloudflare Workflow entrypoint implementing ETL chunk processing and final aggregation.
packages/api/src/utils/logger.ts Adds structured JSON logging with optional Sentry forwarding.
packages/api/src/utils/json-utils.ts Adds JSONL file detection and JSON object → catalog item mapping.
packages/api/src/utils/env-validation.ts Adds ETL_WORKFLOW binding to env schema/types.
packages/api/src/utils/tests/logger.test.ts Unit tests for structured logger output and error unpacking.
packages/api/src/utils/tests/json-utils.test.ts Unit tests for JSONL detection and mapping behavior.
packages/api/src/services/retention/invalidLogRetention.ts Adds batched invalid log retention sweep logic.
packages/api/src/services/retention/tests/invalidLogRetention.test.ts Unit tests for retention sweep loop semantics via DB mocking.
packages/api/src/services/etl/processValidItemsBatch.ts Adds embedding-fallback observability + DB counter updates + structured logging.
packages/api/src/services/etl/processLogsBatch.ts Switches to structured logger; rethrows on DB failures (no silent drops).
packages/api/src/services/etl/processCatalogEtl.ts Adds JSONL parsing path; fixes writer promise capture/backpressure handling.
packages/api/src/services/etl/CatalogItemValidator.ts Hardens validation (URL scheme/SSRF guard, length caps, SKU charset).
packages/api/src/services/etl/tests/CatalogItemValidator.test.ts Tests for new validator hardening rules.
packages/api/src/routes/catalog/index.ts Updates ETL trigger to support workflow engine, chunk planning, and instance ID creation.
packages/api/src/routes/catalog/tests/instanceId.test.ts Regression tests for workflow instance ID construction.
packages/api/src/routes/admin/analytics/catalog.ts Adds workflow-aware retry/repair/reconcile endpoints and catalog audit endpoint.
packages/api/src/index.ts Adds Sentry Worker + Workflow instrumentation and scheduled retention handler.
packages/api/src/db/schema.ts Re-exports shared DB schema for drizzle tooling within API package scope.
packages/api/src/auth/index.ts Ensures KV TTL minimum for Better Auth secondary storage.
packages/api/src/test-stubs/cloudflare-workers.ts Expands stub to include Workflows types/classes for unit test imports.
packages/api/package.json Adds @sentry/cloudflare dependency.
packages/api/drizzle/meta/_journal.json Records new migration entry.
packages/api/drizzle/0047_clear_monster_badoon.sql Migration adding the new etl_jobs columns/indexes/constraints.
packages/api/drizzle.config.ts Points drizzle-kit schema to the new in-package re-export.
docs/runbooks/etl-pipeline.md Adds ETL operational runbook for Workflows-era pipeline.
docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md Adds detailed migration plan document.
docs/audits/2026-05-16-etl-audit.md Adds ETL pipeline audit document.
CLAUDE.md Documents migration discipline for drizzle artifacts.
bun.lock Updates lockfile for new dependencies.
Comments suppressed due to low confidence (2)

packages/api/src/routes/catalog/index.ts:334

  • instanceId construction only strips the file extension, but it does not sanitize other disallowed characters for Cloudflare Workflows IDs (allowed: [a-zA-Z0-9_-]). If source or the base filename contains characters like ., spaces, or :, env.ETL_WORKFLOW.create will still fail at runtime. Consider normalizing ${source}-${filenameWithoutExt} by replacing any non-matching characters with _ (and trimming to 64) rather than only removing the extension.
      // CF Workflows instance IDs only allow [a-zA-Z0-9_-] — strip the file extension.
      const instanceId = `${source}-${filename.replace(FILE_EXT_RE, '')}`.slice(0, 64);

packages/api/src/workflows/catalog-etl-workflow.ts:134

  • In the JSONL path, firstLineSkipped is initialized to true, so the if (!firstLineSkipped) { ... continue } branch is never taken. Given the comment says newline-aligned chunking makes boundary skipping unnecessary, consider removing the dead flag/branch entirely (or initialize it correctly if boundary skipping is still needed) to avoid confusion and potential future regressions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +241 to +246
async ({ body, query }) => {
const { filename, chunks, source, scraperRevision } = body;
const engine = query.engine ?? 'workflow';
const db = createDb();
const env = getEnv();
const jobId = crypto.randomUUID();
Comment on lines +648 to +656
const objectKey = `v2/${job.source}/${job.filename}`;
const env = getEnv();
const r2 = new R2BucketService({ env, bucketType: 'catalog' });
const obj = await r2.get(objectKey);
if (!obj) return status(404, { error: `R2 source not found at ${objectKey}` });

const parser = parse({ relax_column_count: true, skip_empty_lines: true });
let totalRows = 0;
let isHeaderProcessed = false;
Comment on lines +85 to +90
if (useJsonl) {
// --- JSONL streaming path ---
// No csv-parse, no header injection. Each line is a JSON object.
let buffer = '';
// The chunker snaps boundaries to newlines, so every chunk starts at a
// clean line boundary — no partial first-line skip needed for any chunk.
Comment on lines +60 to +71
async function* streamToText(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader();
const decoder = new TextDecoder();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield decoder.decode(value, { stream: true });
}
} finally {
reader.releaseLock();
}
Comment on lines +186 to +188
`evo-evo_2026-04-27T03-25-18.csv-retry-<newJobId>`. Watch each to
completion. Original `etl_jobs` rows stay `failed` for the audit trail;
new rows reflect the successful re-ingest.
Comment on lines +289 to +294
- **ETag fail-closed on repair-from-scratch (not plain retry).** The
`repair-from-scratch` endpoint compares the stored `source_etag` against
`r2.head().etag` and returns 409 on mismatch; pass `?force=true` to
override. The plain `retry` endpoint does not enforce ETag checks — if the
R2 source has been overwritten, retry re-ingests the new content. Use
repair-from-scratch when historical accuracy matters.
Comment on lines +9 to +21
/**
* Returns true if the R2 object key has a JSONL or NDJSON extension.
*/
export function isJsonlFile(objectKey: string): boolean {
const lower = objectKey.toLowerCase();
return lower.endsWith('.jsonl') || lower.endsWith('.ndjson');
}

/**
* Maps a parsed JSON object (one line from a JSONL file) to a partial catalog item.
* Uses `unknown` with proper type narrowing — no `any`.
*/
export function mapJsonRowToItem(obj: Record<string, unknown>): Partial<NewCatalogItem> | null {
Comment on lines +56 to +80
// Embedding-fallback path. The upsert still happens (catalog gets the
// items minus their vectors), but we record the degradation on
// etl_jobs.total_embedding_failures so operators see the count via
// the admin endpoint without trawling logs. Closes audit P2 #3.
logger.warn('etl.embedding.fallback', {
jobId,
skuCount: items.length,
errorName: error instanceof Error ? error.name : 'unknown',
});

const upsertedItems = await catalogService.upsertCatalogItems(mergedItems);
await catalogService.trackEtlJob(upsertedItems, jobId);
await updateEtlJobProgress(env, {
jobId,
valid: items.length,
processed: items.length,
});

const db = createDbClient(env);
await db
.update(etlJobs)
.set({
totalEmbeddingFailures: sql`COALESCE(${etlJobs.totalEmbeddingFailures}, 0) + ${items.length}`,
})
.where(eq(etlJobs.id, jobId));
Resolves conflict in auth/index.ts KV expirationTtl guard — takes
development version with `ttl !== undefined` check and explanatory comment.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added documentation Improvements or additions to documentation dependencies Pull requests that update a dependency file api database labels May 22, 2026
@github-actions

Copy link
Copy Markdown
Contributor

Coverage Report for Expo Unit Tests Coverage (./apps/expo)

Status Category Percentage Covered / Total
🔵 Lines 97.56% (🎯 95%) 560 / 574
🔵 Statements 97.56% (🎯 95%) 560 / 574
🔵 Functions 100% (🎯 97%) 51 / 51
🔵 Branches 95% (🎯 92%) 190 / 200
File CoverageNo changed files found.
Generated in workflow #1449 for commit ad4d009 by the Vitest Coverage Report Action

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api database dependencies Pull requests that update a dependency file documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants