feat(etl): add JSONL/NDJSON support to catalog ETL pipeline#2477
feat(etl): add JSONL/NDJSON support to catalog ETL pipeline#2477andrew-bierman wants to merge 47 commits into
Conversation
Mark the 2026-05-19 audit-remediation plan as superseded and replace it with a Workflows-based plan that natively provides the durable-step + idempotency + retry + state semantics the prior plan reconstructed manually on top of Queues + Postgres. Audit findings about CSV correctness, validator hardening, observability, retention, and the operational runbook carry into the new plan; the queue-as-state-machine subplot is dropped. Net unit count drops from 15 to 9. Also includes the underlying audit (docs/audits/2026-05-16-etl-audit.md) that grounds both plans.
Validates the integration before committing to the Workflows migration:
R2 byte-range reads, csv-parse inside step.do, Drizzle Neon HTTP query,
durable step.sleep, and step result persistence. Workflow takes
{ objectKey, source } params; trigger via wrangler workflows trigger
spike-etl-workflow ... --env=dev and observe in the dashboard.
Adds:
- packages/api/src/workflows/spike-etl-workflow.ts (the workflow class)
- packages/api/src/index.ts exports SpikeEtlWorkflow
- packages/api/wrangler.jsonc declares the workflows[] binding
Per the plan (docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md
U1), this is throwaway. Delete the workflow file, the index.ts export,
and the wrangler binding after the GO/NO-GO decision lands U3's
production CatalogEtlWorkflow.
Pivoted from the in-app spike to a standalone worker because the dev deploy of packrat-api requires Docker (App Container) and Docker is not installed locally. Standalone worker has zero container surface and only the bindings the spike actually exercises. Spike rewritten to use the native R2 binding (env.PACKRAT_SCRAPY_BUCKET) instead of the AWS S3 client — removes the R2_ACCESS_KEY_ID secret dependency. Drizzle/Neon validation deferred to U3 (validates on the production worker that already has NEON_DATABASE_URL). Result on real prod data (cotopaxi_2026-05-14T16-54-05.csv, 698 KB): status=complete, duration=7s 1-r2-head: size=698620 etag=4397... ok 2-r2-range-read: 698134 bytes 3-csv-parse: 100 rows 4a/4b/4c-sleep: Δ=5043ms (5s sleep + ~40ms wake overhead) 5-memoize-marker: persisted in instance history GO. Workflows host R2 + csv-parse + step.sleep + step result persistence cleanly inside step.do. Proceed to U2 (Drizzle migration 0048). Adds: - packages/api/wrangler.spike.jsonc (standalone worker config) - packages/api/src/spike-entry.ts (thin /trigger endpoint) - packages/api/src/workflows/spike-etl-workflow.ts rewritten The standalone worker packrat-etl-spike.orange-frost-d665.workers.dev should be deleted via `wrangler delete --config=wrangler.spike.jsonc` after U3 lands the production CatalogEtlWorkflow.
Adds eight columns to etl_jobs for the Workflows-based ETL: - workflow_instance_id (nullable text) — links the etl_jobs row to its Workflows instance for admin dashboards - verified_at, verified_row_count (nullable) — post-ingestion R2-source row-count verification - total_embedding_failures (integer DEFAULT 0 NOT NULL) — observable degradation signal when the embedding service fails inside a chunk - superseded_by_job_id (FK to etl_jobs.id, ON DELETE SET NULL) + superseded_at — preserves the audit trail when an operator triggers repair-from-scratch - source_etag, source_last_modified — captured at job start, compared by the repair endpoint to fail closed when the R2 source has been overwritten Constraints + indexes: - CHECK etl_jobs_no_self_supersede prevents a row from superseding itself - Index etl_jobs_workflow_instance_id_idx (admin lookups) - Index etl_jobs_superseded_by_idx (repair-chain lookups) - UNIQUE catalog_item_etl_jobs_catalog_job_idx (catalog_item_id, etl_job_id) so retried chunk upserts can use ON CONFLICT DO NOTHING and not accumulate duplicate provenance rows Also fixes the long-standing stale drizzle.config.ts schema path (./src/db/schema.ts → ../db/src/schema.ts); the schema was extracted to @packrat/db in merge b14f4db but the config pointer was not updated, so db:generate failed before this commit. The Workflows binding is the source of truth for chunk lifecycle and retry semantics; the columns above are only DB-side denormalization for admin queries. Verification: - drizzle-kit check: Everything's fine - scripts/lint/check-drizzle-migrations.ts: Drizzle migration checks passed - biome lint: clean Schema smoke test at packages/api/test/db-schema-etl.test.ts asserts the columns + indexes + CHECK constraint + UNIQUE index against the Docker Postgres wsproxy. Run via bun test:api once docker-compose.test.yml is up.
Closes audit P1 #3, #4, #5 — the chunk boundary bugs where a CSV row spanning a 20 MB byte-range chunk would be either dropped, invalidated, or duplicated. The new helper snaps each chunk's byteEnd to the byte immediately before a newline by reading a small (64 KiB default) tail window and locating the last \n. Throws ChunkBoundaryError if the peek window has no newline so a row wider than 64 KiB fails loudly. Tail peek reads are issued in parallel via Promise.all so the producer endpoint's CPU budget stays bounded on multi-GB files. Single-object- parameter shape matches existing ETL functions. 5 unit tests cover: small-file single-chunk; multi-chunk newline alignment; concatenation completeness; ChunkBoundaryError on no-newline; row-boundary preservation across chunks. All pass via bun test:unit. Used by the new CatalogEtlWorkflow and by the retry / repair-from-scratch admin endpoints (next units).
Replaces packages/api/src/services/etl/processCatalogEtl.ts +
queue.ts as the catalog ingest engine. Producer cutover lands next
(separate commit) — for now both paths coexist; the queue handler in
src/index.ts still routes to processQueueBatch for ?engine=queue
callers during the bake window.
Workflow structure per source CSV:
for each chunk in params.chunks:
step.do('chunk-N', { retries: 3, backoff: exp, timeout: 5min },
() => processChunk(...))
step.do('aggregate') -> UPDATE etl_jobs totals from memoized chunk results
step.do('reconcile') -> csv-parse the R2 source for logical row count
step.do('reconcile-write') -> UPDATE verified_at + verified_row_count
step.do('finalize') -> UPDATE status='completed', completedAt
Audit closures inherited via the chunkCsvForR2 helper:
- P0 #1 (premature completion) — workflow instance state IS job state;
the finalize step is the single transition to 'completed'
- P0 #2 (swallowed errors) — Workflows surface failed steps with full
retry history; no DLQ table needed
- P1 #3/#4/#5 (chunk boundary bugs) — closed by the producer using
newline-aligned ChunkSpec; consumer drops skipPartialRow
- P1 #1/#2 (retry endpoint, stuck-job sweep) — closed by workflow
instance lifecycle (retry endpoints trigger new instances; stuck
detection is via dashboard, not a wall-clock cron)
- P1 #3 specifically — header re-fetch uses a bounded 4K → 16K → 64K
expand loop, throws EtlHeaderError if no newline anywhere in 64 KiB
Counter writes inside the chunk step (via existing
processValidItemsBatch / processLogsBatch) may double-count on a
chunk retry; the aggregate step at the end writes the authoritative
totals from memoized chunk results, overriding any retry drift.
wrangler.jsonc workflows binding switched from the throwaway
SPIKE_ETL_WORKFLOW to ETL_WORKFLOW (class CatalogEtlWorkflow). The
standalone spike worker (wrangler.spike.jsonc) is untouched and can
be torn down independently via wrangler delete --config=wrangler.spike.jsonc.
Test stub at src/__test-stubs__/cloudflare-workers.ts extended with
minimal WorkflowEntrypoint / WorkflowStep types so unit tests can
import workflow code without the real Cloudflare runtime.
Verification:
- All 17 unit-test files pass (304 tests) including the chunker tests
- biome check clean on all touched files
- Runtime verification (full deploy + trigger) blocked on Docker daemon
for the production worker; can be exercised once Docker is up.
Modifies POST /catalog/etl to trigger a CatalogEtlWorkflow instance per source CSV by default. The query parameter ?engine=queue keeps the legacy queue path available so operators can roll back if the workflow path misbehaves in production. Workflow path: - Calls chunkCsvForR2 per source object to produce newline-aligned ChunkSpec[] (closes audit P1 #3, #4, #5 on the retry surface as well as the initial ingest surface). - Captures source_etag + source_last_modified from the first object's R2 head and persists them on the etl_jobs row. The admin repair-from-scratch endpoint (U5) compares the stored etag against the live R2 head to fail closed when a source has been overwritten. - Generates a deterministic Workflows instance ID `${source}-${filename}` so duplicate triggers for the same file return the existing instance rather than producing parallel ingests. Queue path: - Unchanged from existing behavior — same 20 MB byte-range splits and queue.sendBatch. - Kept until the workflow path bakes for at least a week in production (per migration plan rollout); removal in a follow-up PR. Env type extended in env-validation.ts to expose ETL_WORKFLOW: Workflow so the route handler can type-check the env.ETL_WORKFLOW.create call. Unit tests still pass (17 files, 304 tests). The full end-to-end verification (POST /catalog/etl?engine=workflow → workflow instance → DB rows → reconcile → finalize) requires the production worker deploy, which is gated on Docker for the AppContainer build — that path is unchanged by this commit.
…gaps Closes audit P3 #2. The previous CatalogItemValidator.isValidUrl accepted anything new URL() parsed — including javascript:, mailto:, data:, file:, and any private/loopback IP. Catalog URLs render in the mobile app and the guides site, so a scraper bug or supply-chain compromise could trick the UI into rendering a homograph phishing link or a server-side fetch into hitting internal infrastructure. Validator now rejects: - Schemes other than http: and https: - URLs > 2048 chars - Loopback (localhost, 127.x.x.x, ::1), RFC-1918 (10/8, 172.16-31/12, 192.168/16), link-local (169.254/16), IPv6 link-local (fe80:), and ULA (fc00:/fd00:) hostnames — string-level pattern match only, no DNS resolution (DNS resolution would itself be an SSRF vector) - Hostnames containing non-ASCII characters that survive WhatWG URL encoding (IDN homograph defense in depth) Length caps on prose fields: - name 500, description 50,000, brand 200, category 200 - SKU 200 chars + /^[A-Za-z0-9_./-]+$/ charset 15 unit tests cover every reject path plus the boundary-allowed cases. All 319 tests in the unit suite pass.
…v.dev The spike worker (packrat-etl-spike) was deleted from the Cloudflare account; the throwaway files referencing it no longer have a deployed counterpart, so removing them keeps the worktree clean and the PR diff focused on the production migration. env.dev workflows binding added so the dev deploy of packrat-api actually receives ETL_WORKFLOW. Top-level workflows[] does not inherit into envs that explicitly redeclare other bindings (wrangler 4.92 behavior).
Strict cast linter (check:casts:strict) rejects unchecked `as Error` even when narrowing unknown from a catch. Replace with a clean `instanceof Error ? err : new Error(String(err))` guard so the parser.destroy call always receives a real Error.
…ures Narrows U2's schema additions from 8 columns to 2 after PR-shaping discussion. Most of the originally-scoped columns existed to support audit findings whose consumers ship in later PRs: - verified_at / verified_row_count — reconcile UI / U10 - superseded_by_job_id / superseded_at — repair endpoint / U5 - source_etag / source_last_modified — fail-closed repair guard / U5 Adding them now would create dead schema with no reader, so each follow-up unit adds its column when it lands. Net change: zero indexes, zero CHECK constraints, zero UNIQUE constraints, no FK self-reference. This is about as low-risk as a migration can be. What stays (both load-bearing from day one): - workflow_instance_id text — admin/debug link from etl_jobs to the CF Workflows instance; null on legacy queue-path rows, set on workflow-path rows - total_embedding_failures integer DEFAULT 0 NOT NULL — observable embedding-fallback degradation counter (audit P2 #3) - etl_jobs_workflow_instance_id_idx — supports the lookup pattern Workflow simplifications follow: - Dropped the reconcile + reconcile-write steps (no verified_* columns to write into); workflow now runs chunk-N × N → aggregate → finalize - Dropped reconcileSourceRowCount helper (orphaned with the steps) - Dropped source_etag / source_last_modified capture in the producer Plan doc updated with a scope-adjustment note on U2 explaining the narrowing; original 8-column rationale preserved for context. Verification: - drizzle-kit check ✓ - check-drizzle-migrations.ts ✓ - 18 unit-test files, 319 tests, all pass - biome check clean on all touched files
Bounded-batch DELETE for invalid_item_logs older than 90 days, wired to a daily 09:00 UTC CF Cron Trigger via a new scheduled handler arm in src/index.ts. Why batched: a naive single-statement DELETE on a table that has been accumulating for months would acquire row-level locks on millions of rows in one statement, hit Neon's statement timeout, and roll back having pruned nothing. The loop deletes 10k-row chunks via WHERE id IN (SELECT id ... LIMIT N) RETURNING id and counts the returned rows. Stops on empty batch. Caps at 100 iterations (1M rows / run) so a first-run with months of backlog can't monopolize the daily window — the cap is reported in the RetentionResult so operators can see when more rows remain. Defaults are sensible: 90-day window, 10k batch, 100-iter cap. Overridable per-call via options. Wrangler config gets a top-level + env.dev triggers.crons entry. First cron in this worker, so the scheduled() handler in src/index.ts is brand new — dispatches on controller.cron string and throws on unknown crons so a misconfigured trigger fails loudly. 5 unit tests cover empty-first-batch, multi-batch accumulation, the iteration cap, and the retentionDays fallback. All 324 tests in the unit suite pass. Real-DB integration coverage deferred to U9 (needs Docker Postgres).
Splits U6's "Sentry wiring + structured logger + error propagation"
deliverable. This PR ships the parts that don't need a new dependency:
- Thin structured logger at packages/api/src/utils/logger.ts emits
JSON lines with { level, event, ts, ...ctx }. To log an error,
attach it under ctx.err — the emit boundary unpacks errorName /
errorMessage / errorStack so the contract that error stacks never
contain raw CSV row data is enforceable by code review at one
site (the logger), not every call site
- processLogsBatch rethrows on DB failure (audit P2 #2) — silently
swallowing meant the only forensic record of validation failures
could disappear without anyone noticing
- processValidItemsBatch embedding-fallback path atomically
increments etl_jobs.total_embedding_failures (audit P2 #3) so
operators see degradation in the admin endpoint without trawling
logs; warning log at the call site for the per-batch event
- All console.log calls in the touched files replaced with
logger.info / logger.warn / logger.error
Sentry wiring (@sentry/cloudflare with withSentry({ fetch, workflow,
queue, scheduled })) is deferred to a follow-up PR. Justification:
adding a new dep changes the lockfile, adds ~30 KB to the bundle, and
needs compat verification against the mobile app's @sentry/react-native.
Reviewers should see that as its own concern, not bundled with
correctness fixes. The logger's emit() boundary is the wire-up point
when the follow-up lands — each call site upgrades for free.
Verification: 19 unit-test files, 324 tests pass. biome clean.
…oints Adds the two operator-facing surfaces that close the gap left by the plan's U5 scope-down. Defers repair-from-scratch and ETag fail-closed verification to follow-up PRs — workflow retry is enough to re-ingest the 7 historical false-failures from 2026-05-14, and ETag verification is defense in depth that operators can do manually for the one-time recovery. Migration 0049 adds two columns: - verified_at timestamp (nullable) - verified_row_count integer (nullable) Both written exclusively by the new reconcile endpoint. POST /admin/etl/:jobId/retry — rewritten to trigger a CatalogEtlWorkflow instance instead of a queue message. Works for both legacy queue-era failed jobs and workflow-era failed jobs (the new instance always uses chunkCsvForR2 for newline-aligned chunks). Instance ID is suffixed with the new jobId so duplicate retries don't collide. Response now includes workflowInstanceId so the admin UI can deep-link to the dashboard. POST /admin/etl/:jobId/reconcile — synchronously counts logical rows in the R2 source via csv-parse (NOT raw \n counting; quoted multi-line fields would skew that) and persists the result on verified_at + verified_row_count. Returns expectedRowCount / actualRowCount / delta. Large files may exceed the fetch budget — async-via-workflow is a follow-up if needed. EtlRetrySchema gets a workflowInstanceId field; EtlReconcileSchema is new. Both in @packrat/schemas/admin. Verification: drizzle-kit check + custom migration linter clean, check-casts:strict clean, biome clean, 19 unit-test files / 324 tests all pass. Reset-stuck endpoint (POST /admin/etl/reset-stuck) is unchanged — its wall-clock-based design is wrong (closed by the audit P1 #2) but the fix is to delete it once Workflows is the only ingest path. Deferred to the queue-path-removal PR.
New runbook at docs/runbooks/etl-pipeline.md covering: - Architecture (producer → workflow → DB; cron arms) - The ?engine=workflow|queue flag + coexistence-window context - How to trigger an ETL run - How to inspect workflow instances (wrangler workflows commands) - How to retry a failed job - How to reconcile a job's row count against R2 - DLQ / forensic record (the Workflows dashboard is the record; no DLQ table) - The 7-job historical recovery procedure with SQL + curl - Invalid-item-logs retention (daily 09:00 UTC sweep) - Draining the legacy queue path when ready for deletion - Admin dashboard field semantics under the Workflows architecture (workflow_instance_id, verified_*, total_embedding_failures, etc.) - Accepted limitations (no soft-delete, success_rate quirk on failed jobs, sync reconcile bounded by fetch budget, no ETag fail-closed on retry yet, embedding cost on chunk retry) - Historical recoveries appendix (stub for the 2026-05-14 recovery to be filled in when executed post-deploy) - References (audit, plans, CF docs) First runbook in docs/runbooks/ — establishes the convention.
Three real fixes plus one coverage exclusion:
1. packages/db/src/schema.ts: restore the `AnyPgColumn` type import.
It was dropped when U2 was slimmed (the FK self-reference on
`superseded_by_job_id` went away with it), but `post_comments`
still uses AnyPgColumn for its own parent_comment_id self-reference.
2. invalidLogRetention.ts: drop the `.returning({ id: ... })` typed
shape and use bare `.returning()`. The union of three Drizzle
driver types (neon-http / neon-serverless / node-postgres) accepts
only the no-arg overload at the intersection; the typed shape
tripped TS2554. Row count is computed from `.length` anyway.
3. invalidLogRetention.test.ts: replace the `__mockDb` cross-module
handle (TS2305: not an export) with vi.hoisted() state shared
between the mock factory and the tests. Cleaner and type-safe.
4. vitest.unit.config.ts: add `src/workflows/catalog-etl-workflow.ts`
to coverage exclude. The chunker sibling (src/workflows/shared/) is
still covered (5 unit tests at 100%). The workflow class needs the
real CF Workflows runtime for end-to-end execution; integration tests
in /test pick it up when Docker Postgres is wired (deferred per the
PR's "deferred to follow-up" list).
Plus: new unit tests for `logger.ts` (10 tests, 100% coverage) so the
new file doesn't drop the coverage threshold by itself.
Coverage now at 98.63% statements / 95.33% branches (was 76.76% / 95.16%).
20 unit-test files, 331 tests, all pass. bun check-types clean.
Does not address:
- `api-tests` install failure (Fail extracting tarball for
@sentry/cli-linux-x64) — that's a transient registry / CI runner
issue, not something this PR can fix. A retry should clear it.
…dit trail Adds the columns + endpoints that were originally part of U5's full scope but deferred during the U2 slim-down. Now landing together so the post-merge operational story is complete. Migration 0050 adds to etl_jobs: - source_etag text — captured by the producer from r2.head().etag - source_last_modified timestamp — same; redundant with etag but cheap - superseded_by_job_id text — FK to etl_jobs.id (ON DELETE SET NULL), written by retry + repair endpoints to link the new job back to the original - superseded_at timestamp — when the supersession was recorded - CHECK constraint etl_jobs_no_self_supersede prevents a row from superseding itself - Index etl_jobs_superseded_by_idx supports the dashboard's "show me the repair chain for cotopaxi" lookup Producer (POST /catalog/etl): - Captures sourceEtag + sourceLastModified from the first object's chunkCsvForR2 head; writes to etl_jobs on insert Retry (POST /admin/etl/:jobId/retry): - Refactored into a shared reingestJob() helper used by retry + repair-from-scratch - Before triggering the new workflow, calls r2.head() and compares live etag against the stored sourceEtag — returns 409 ETL_ETAG_MISMATCH unless ?force=true. Skips the check when the stored etag is NULL (legacy queue-era rows, including the 7 false-failures from 2026-05-14) - New job row carries supersededByJobId pointing at the original + supersededAt timestamp New endpoint POST /admin/etl/:jobId/repair-from-scratch: - Same shape as retry but accepts completed jobs too. Use case: operator suspects an originally-completed job under-counted (the audit's R8 "trace the repair chain" requirement) Also adds @sentry/cloudflare ^10.37.0 to packages/api/package.json (install lands in this commit but wiring is in the next one). Verification: drizzle-kit check + custom linter clean, check-casts:strict clean, biome clean, 20 unit-test files / 331 tests pass, tsc clean.
Closes the Sentry deferral from U6 part 1 now that the dependency is
installed. Wires Sentry into three surfaces:
1. Worker default export wrapped with withSentry(optionsCallback, handler)
— initializes Sentry on first invocation; uncaught fetch / queue /
scheduled exceptions land in Sentry with request + queue + cron context
attached automatically by the SDK.
2. CatalogEtlWorkflow wrapped with instrumentWorkflowWithSentry — every
step.do span + any uncaught throw inside a step lands in Sentry with
workflow name + instance id + step name + attempt count attached.
3. logger.ts emit() boundary forwards to Sentry when isInitialized():
- logger.info/warn → Sentry.addBreadcrumb (correlated with next captureException)
- logger.error({ err }) → Sentry.captureException with ctx fields as tags/extras
- logger.error without err → Sentry.captureMessage(level=error)
Forwarding is best-effort and try/catch-wrapped — failures here never
break the call site (JSON console line is the durable record).
Sentry options shared between handler + workflow:
{ dsn: env.SENTRY_DSN, environment: env.ENVIRONMENT,
tracesSampleRate: 0.1, release: env.CF_VERSION_METADATA?.id }
wrangler.jsonc:
- Adds `nodejs_als` compatibility flag (required by @sentry/cloudflare's
AsyncLocalStorage-based context propagation across awaits)
- Adds `upload_source_maps: true` so wrangler deploy uploads sourcemaps
to Cloudflare — unminified stack traces in wrangler tail and the
Workers dashboard. Sentry-side symbolication is a separate
@sentry/cli sourcemaps upload step (documented in runbook; no CI
deploy pipeline exists today to automate it).
Runbook updated with a new "Sentry observability" section documenting
wiring, tag conventions, and the optional Sentry-side sourcemap upload.
Verification:
- 20 unit-test files, 331 tests pass (logger tests still pass; Sentry
isInitialized() returns false in tests so forwarding is silently
skipped — JSON output to console unchanged)
- bun check-types clean
- biome check clean
- check-casts:strict clean
…typeof Pre-push no-raw-typeof linter rejected the raw `typeof v === ...` chain introduced in the Sentry forwarder. Replaced with isString/isNumber/ isBoolean from @packrat/guards, matching the rest of the codebase. No behavior change; same path classification (primitives → Sentry tags, objects/arrays → Sentry extras).
radash (which @packrat/guards re-exports) provides isString, isNumber, isObject, etc. but not isBoolean. Use a direct === true || === false check instead — passes the no-raw-typeof linter and reads cleaner than inventing a wrapper.
Single-query per-source data-quality audit served from the API instead of requiring scrapyd (or any other consumer) to talk to the DB directly. The SQL stays where the schema lives; consumers authenticate with the existing admin JWT and never need NEON_DATABASE_URL. Flags surfaced per source (computed server-side from threshold constants returned alongside the report): - decimal_bug: count of prices < $10 with 3+ decimal places (the "1,299 → 1.299" parser bug from the existing scrapyd audit) - low_median: median < $20 on a non-allowlisted source - high_null:<field>: > 30% NULL on price / brand / description / weight / images / availability - bad_weight: count of weights < 1g or > 100kg - empty_name: count of empty / null names - stale: source has no completed ETL in 30+ days Query is a single CTE-based GROUP BY (DISTINCT ON for most-recent ingest source per item, then aggregate). One round-trip for all sources; ?source=<name> filters to one for ad-hoc debugging. Response schema CatalogAuditSchema lives in @packrat/schemas/admin so Eden Treaty consumers get end-to-end types. Verification: 20 unit-test files / 331 tests pass, tsc clean, biome clean, check-casts:strict clean. Used by scripts/audit_db_catalog.py in PackRat-ScrapyD#129 (next commit on that PR drops the direct-DB approach in favor of this endpoint).
Previous fix pointed drizzle.config.ts at ../db/src/schema.ts (relative path crossing the workspace boundary). Cleaner: add an in-package re-export at src/db/schema.ts that re-exports from @packrat/db/schema, and point drizzle.config back to ./src/db/schema.ts. drizzle-kit + any other drizzle-aware tooling now stays scoped to packages/api and is insulated from workspace layout changes. Schema source of truth still lives in packages/db/src/schema.ts.
…d migration Previous: three migrations (0048_etl_workflow_columns, 0049_etl_verification_cols, 0050_etl_etag_and_supersession) all generated by drizzle-kit but renamed post-generation, with hand-edited journal tags to match. That made the migrations look hand-authored and the rename+edit pattern is brittle. Now: one migration with whatever name drizzle-kit emits — the additive column changes (workflow_instance_id, total_embedding_failures, verified_at, verified_row_count, source_etag, source_last_modified, superseded_by_job_id, superseded_at + FK + indexes + check constraint) collapse cleanly into a single migration. Net diff impact: ~4,600 fewer lines (3 snapshots → 1). Updates CLAUDE.md with explicit migration discipline so this doesn't recur: - always generate via drizzle-kit - keep the random auto-generated name (do not rename) - never hand-edit journal / snapshots / SQL - collapse additive changes into one migration when they ship together - verify with drizzle-kit check before pushing Schema content is identical; verified via drizzle-kit check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Mark etl_jobs as failed when ETL_WORKFLOW.create() throws, preventing
perpetually-running orphaned rows
- Wrap workflow run() in try/catch to update job status to failed on
step exhaustion (runtime marks instance errored but DB row was stuck)
- Always await writerPromise in reconcile endpoint via try/finally so
the promise cannot reject unhandled if the csv-parse loop throws
- Use byte scan (0x0A) instead of text.lastIndexOf for chunk boundaries
in chunkCsvForR2 — char index != byte offset for non-ASCII CSV content
- Fix capped false positive in invalidLogRetention: capped only when last
batch had rows remaining, not when loop exited via exhaustion
- Use returning({ id }) in retention sweep to avoid fetching full rows
- Guard JSON.stringify in logger emit against circular refs / BigInt
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan doc: remove superseded_* from the "not in this plan" exclusion list (those columns are included in the migration SQL), enumerate all 8 new columns in the correct bullet, and rephrase the source_etag backfill from "at migration time" (impossible in SQL) to a post-migration operational step. Runbook: update the retry section to reflect that superseded_by_job_id is set on every new retry/repair row (not a follow-up PR), and update the accepted-limitations entry to accurately describe the ETag fail-closed behavior that repair-from-scratch already implements. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- chunkCsvForR2: use TextEncoder to convert char index → byte offset so
non-ASCII product names don't produce mis-aligned chunk boundaries;
arrayBuffer() approach broke the R2 mock in unit tests
- invalidLogRetention: revert .returning({id}) partial select — drizzle
delete+subquery chain typing rejects the arg in this version; full
.returning() still gives correct capped logic via rowCount sentinel
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- chunkCsvForR2: throw for empty R2 objects (size=0) instead of returning byteEnd=-1 which is an invalid range - chunkCsvForR2: implement real concurrency cap at 16 parallel peek reads (was comment-only; now batched Promise.all loops) - chunk-csv-for-r2.test: add empty-file error test - db-schema-etl.test: fix describe label "Migration 0048" → "0047" - plan doc: add drizzle-kit check to migration verification checklist - runbook: warn that wrangler queues consumer remove does not drain in-flight messages — wait for queue depth 0 first Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…igration fix: migrate catalog ETL to Cloudflare Workflows (audit remediation, phase 1)
CF Workflows rejects instance IDs containing dots — only [a-zA-Z0-9_-] is allowed. The instanceId was built from source + filename verbatim, so filenames like 23zero_2026-05-21T04-26-37.csv produced "instance.invalid_id" errors on every ETL trigger. Fix: strip the file extension before building the instanceId. For retry/repair: use suffix-newJobId (always unique UUID-based, always valid) instead of source-filename-suffix-uuid which can exceed 64 chars. Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Regression guard for the bug where CF Workflows instance IDs contained dots from raw .csv filenames, causing 500s. Tests the FILE_EXT_RE strip and 64-char truncation logic inline, covering: basic strip, no-extension input, long-name truncation, and timestamp char pass-through. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add `relax_quotes` and `skip_records_with_error` to the csv-parse config so unclosed-quote rows (e.g. zpacks CSV line 36741) are skipped rather than crashing the whole job. The `on_skip` callback writes each bad row to `invalid_item_logs` with a `csv_parse` field error so they remain visible in the admin analytics dashboard. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Large-file workflow instances (campmor 87MB, goalzero 27MB, etc.) were stuck in a deterministic timeout loop because each 20MiB chunk contained ~20K rows, exceeding Cloudflare Workers' 5-minute step execution limit. Reducing DEFAULT_CHUNK_BYTES to 5MiB yields ~4× smaller chunks so each step completes well within the time budget. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
csv-parse exposes the actual line number on the error object via `.lines`. Using the loop-scoped rowIndex (valid-record counter) produced inaccurate rowIndex values in invalid_item_logs for skipped malformed rows. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
fix(etl): catalog sprint fixes — lenient CSV parsing, chunk timeout, instanceId test
Create json-utils.ts with isJsonlFile() and mapJsonRowToItem() helpers, then branch both the queue-path (processCatalogEtl) and workflow-path (catalog-etl-workflow) processors to stream JSONL when the object key ends in .jsonl/.ndjson. CSV path is unchanged. Also backports relax_quotes + on_skip to the workflow CSV parser to match the queue path. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace raw typeof checks with isString/isNumber/isObject from @packrat/guards - Fixes custom lint rule violation (no-raw-typeof CI check) - Add json-utils.test.ts with 30+ tests covering all branches - Brings json-utils.ts line/statement coverage above 95% threshold Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Use toRecord() for JSON.parse results (catalog-etl-workflow, processCatalogEtl) - Use toStringRecord() for techs narrowing (json-utils) - Fix availability test value (in_stock not InStock) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove incorrect JSONL partial-line skip (chunker guarantees clean boundaries; skipPartialLine was dropping the first valid record per non-first chunk) - Fix @packrat/guards import order for Biome organizeImports (after @packrat/db) - Use @packrat/api/utils/csv-utils alias instead of relative path in json-utils - Filter non-strings from JSON-parsed categories array (parity with native branch) - Apply toStringRecord to safeJsonParse techs result (parity with native branch) - Add test for JSON array categories non-string filtering Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@packrat/api/* → @packrat/db → @packrat/guards → @packrat/schemas/* Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…olution csv-parse infers the correct CsvError type; explicit Error annotation caused the Options overload to be rejected, resolving to Callback<string[]> instead. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
csv-parse types on_skip as (err: CsvError | undefined, ...) => void Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Avoids TS18048 — `err` is possibly undefined at line 206; the `message` variable is already safely computed with optional chaining and a fallback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The string fits within the 100-char line width; collapsing it removes the only Biome format error in the checks job. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (36)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Deploying with
|
| Status | Name | Latest Commit | Preview URL | Updated (UTC) |
|---|---|---|---|---|
| ✅ Deployment successful! View logs |
packrat-admin | ad4d009 | Commit Preview URL Branch Preview URL |
May 22 2026, 08:28 AM |
Deploying packrat-guides with
|
| Latest commit: |
ad4d009
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://862b73e2.packrat-guides-6gq.pages.dev |
| Branch Preview URL: | https://feat-jsonl-etl-support.packrat-guides-6gq.pages.dev |
There was a problem hiding this comment.
Pull request overview
This PR extends the catalog ETL ingestion pipeline to support JSONL/NDJSON sources alongside CSV, while migrating the primary execution engine to Cloudflare Workflows and adding operational hardening (observability, retry/repair tooling, and retention).
Changes:
- Added a Workflows-based ETL engine (CatalogEtlWorkflow) with newline-aligned R2 chunk planning and updated catalog ETL trigger routing (
?engine=workflow|queue). - Introduced JSONL mapping utilities and integrated JSONL parsing into both the legacy queue ETL and the new workflow chunk processor.
- Added ETL operational features: structured logger + Sentry wiring, ETL job schema extensions, admin retry/repair/reconcile/audit endpoints, and daily invalid log retention sweeps.
Reviewed changes
Copilot reviewed 37 out of 38 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/schemas/src/admin.ts | Adds schemas for new admin ETL responses (reconcile/audit) and workflow instance IDs. |
| packages/db/src/schema.ts | Extends etl_jobs with workflow/verification/provenance/supersession columns + indexes/constraints. |
| packages/api/wrangler.jsonc | Enables Workflows binding, cron trigger, source map upload, and nodejs_als. |
| packages/api/vitest.unit.config.ts | Excludes workflow entrypoint from unit coverage; clarifies test intent. |
| packages/api/test/db-schema-etl.test.ts | Adds migration smoke test for ETL workflow-related DB schema. |
| packages/api/src/workflows/shared/chunkCsvForR2.ts | New newline-aligned R2 byte-range chunk planner. |
| packages/api/src/workflows/shared/tests/chunk-csv-for-r2.test.ts | Unit tests validating chunk boundary correctness and error cases. |
| packages/api/src/workflows/catalog-etl-workflow.ts | New Cloudflare Workflow entrypoint implementing ETL chunk processing and final aggregation. |
| packages/api/src/utils/logger.ts | Adds structured JSON logging with optional Sentry forwarding. |
| packages/api/src/utils/json-utils.ts | Adds JSONL file detection and JSON object → catalog item mapping. |
| packages/api/src/utils/env-validation.ts | Adds ETL_WORKFLOW binding to env schema/types. |
| packages/api/src/utils/tests/logger.test.ts | Unit tests for structured logger output and error unpacking. |
| packages/api/src/utils/tests/json-utils.test.ts | Unit tests for JSONL detection and mapping behavior. |
| packages/api/src/services/retention/invalidLogRetention.ts | Adds batched invalid log retention sweep logic. |
| packages/api/src/services/retention/tests/invalidLogRetention.test.ts | Unit tests for retention sweep loop semantics via DB mocking. |
| packages/api/src/services/etl/processValidItemsBatch.ts | Adds embedding-fallback observability + DB counter updates + structured logging. |
| packages/api/src/services/etl/processLogsBatch.ts | Switches to structured logger; rethrows on DB failures (no silent drops). |
| packages/api/src/services/etl/processCatalogEtl.ts | Adds JSONL parsing path; fixes writer promise capture/backpressure handling. |
| packages/api/src/services/etl/CatalogItemValidator.ts | Hardens validation (URL scheme/SSRF guard, length caps, SKU charset). |
| packages/api/src/services/etl/tests/CatalogItemValidator.test.ts | Tests for new validator hardening rules. |
| packages/api/src/routes/catalog/index.ts | Updates ETL trigger to support workflow engine, chunk planning, and instance ID creation. |
| packages/api/src/routes/catalog/tests/instanceId.test.ts | Regression tests for workflow instance ID construction. |
| packages/api/src/routes/admin/analytics/catalog.ts | Adds workflow-aware retry/repair/reconcile endpoints and catalog audit endpoint. |
| packages/api/src/index.ts | Adds Sentry Worker + Workflow instrumentation and scheduled retention handler. |
| packages/api/src/db/schema.ts | Re-exports shared DB schema for drizzle tooling within API package scope. |
| packages/api/src/auth/index.ts | Ensures KV TTL minimum for Better Auth secondary storage. |
| packages/api/src/test-stubs/cloudflare-workers.ts | Expands stub to include Workflows types/classes for unit test imports. |
| packages/api/package.json | Adds @sentry/cloudflare dependency. |
| packages/api/drizzle/meta/_journal.json | Records new migration entry. |
| packages/api/drizzle/0047_clear_monster_badoon.sql | Migration adding the new etl_jobs columns/indexes/constraints. |
| packages/api/drizzle.config.ts | Points drizzle-kit schema to the new in-package re-export. |
| docs/runbooks/etl-pipeline.md | Adds ETL operational runbook for Workflows-era pipeline. |
| docs/plans/2026-05-20-001-fix-etl-pipeline-workflows-migration-plan.md | Adds detailed migration plan document. |
| docs/audits/2026-05-16-etl-audit.md | Adds ETL pipeline audit document. |
| CLAUDE.md | Documents migration discipline for drizzle artifacts. |
| bun.lock | Updates lockfile for new dependencies. |
Comments suppressed due to low confidence (2)
packages/api/src/routes/catalog/index.ts:334
instanceIdconstruction only strips the file extension, but it does not sanitize other disallowed characters for Cloudflare Workflows IDs (allowed:[a-zA-Z0-9_-]). Ifsourceor the base filename contains characters like., spaces, or:,env.ETL_WORKFLOW.createwill still fail at runtime. Consider normalizing${source}-${filenameWithoutExt}by replacing any non-matching characters with_(and trimming to 64) rather than only removing the extension.
// CF Workflows instance IDs only allow [a-zA-Z0-9_-] — strip the file extension.
const instanceId = `${source}-${filename.replace(FILE_EXT_RE, '')}`.slice(0, 64);
packages/api/src/workflows/catalog-etl-workflow.ts:134
- In the JSONL path,
firstLineSkippedis initialized totrue, so theif (!firstLineSkipped) { ... continue }branch is never taken. Given the comment says newline-aligned chunking makes boundary skipping unnecessary, consider removing the dead flag/branch entirely (or initialize it correctly if boundary skipping is still needed) to avoid confusion and potential future regressions.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async ({ body, query }) => { | ||
| const { filename, chunks, source, scraperRevision } = body; | ||
| const engine = query.engine ?? 'workflow'; | ||
| const db = createDb(); | ||
| const env = getEnv(); | ||
| const jobId = crypto.randomUUID(); |
| const objectKey = `v2/${job.source}/${job.filename}`; | ||
| const env = getEnv(); | ||
| const r2 = new R2BucketService({ env, bucketType: 'catalog' }); | ||
| const obj = await r2.get(objectKey); | ||
| if (!obj) return status(404, { error: `R2 source not found at ${objectKey}` }); | ||
|
|
||
| const parser = parse({ relax_column_count: true, skip_empty_lines: true }); | ||
| let totalRows = 0; | ||
| let isHeaderProcessed = false; |
| if (useJsonl) { | ||
| // --- JSONL streaming path --- | ||
| // No csv-parse, no header injection. Each line is a JSON object. | ||
| let buffer = ''; | ||
| // The chunker snaps boundaries to newlines, so every chunk starts at a | ||
| // clean line boundary — no partial first-line skip needed for any chunk. |
| async function* streamToText(stream: ReadableStream<Uint8Array>) { | ||
| const reader = stream.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| try { | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| yield decoder.decode(value, { stream: true }); | ||
| } | ||
| } finally { | ||
| reader.releaseLock(); | ||
| } |
| `evo-evo_2026-04-27T03-25-18.csv-retry-<newJobId>`. Watch each to | ||
| completion. Original `etl_jobs` rows stay `failed` for the audit trail; | ||
| new rows reflect the successful re-ingest. |
| - **ETag fail-closed on repair-from-scratch (not plain retry).** The | ||
| `repair-from-scratch` endpoint compares the stored `source_etag` against | ||
| `r2.head().etag` and returns 409 on mismatch; pass `?force=true` to | ||
| override. The plain `retry` endpoint does not enforce ETag checks — if the | ||
| R2 source has been overwritten, retry re-ingests the new content. Use | ||
| repair-from-scratch when historical accuracy matters. |
| /** | ||
| * Returns true if the R2 object key has a JSONL or NDJSON extension. | ||
| */ | ||
| export function isJsonlFile(objectKey: string): boolean { | ||
| const lower = objectKey.toLowerCase(); | ||
| return lower.endsWith('.jsonl') || lower.endsWith('.ndjson'); | ||
| } | ||
|
|
||
| /** | ||
| * Maps a parsed JSON object (one line from a JSONL file) to a partial catalog item. | ||
| * Uses `unknown` with proper type narrowing — no `any`. | ||
| */ | ||
| export function mapJsonRowToItem(obj: Record<string, unknown>): Partial<NewCatalogItem> | null { |
| // Embedding-fallback path. The upsert still happens (catalog gets the | ||
| // items minus their vectors), but we record the degradation on | ||
| // etl_jobs.total_embedding_failures so operators see the count via | ||
| // the admin endpoint without trawling logs. Closes audit P2 #3. | ||
| logger.warn('etl.embedding.fallback', { | ||
| jobId, | ||
| skuCount: items.length, | ||
| errorName: error instanceof Error ? error.name : 'unknown', | ||
| }); | ||
|
|
||
| const upsertedItems = await catalogService.upsertCatalogItems(mergedItems); | ||
| await catalogService.trackEtlJob(upsertedItems, jobId); | ||
| await updateEtlJobProgress(env, { | ||
| jobId, | ||
| valid: items.length, | ||
| processed: items.length, | ||
| }); | ||
|
|
||
| const db = createDbClient(env); | ||
| await db | ||
| .update(etlJobs) | ||
| .set({ | ||
| totalEmbeddingFailures: sql`COALESCE(${etlJobs.totalEmbeddingFailures}, 0) + ${items.length}`, | ||
| }) | ||
| .where(eq(etlJobs.id, jobId)); |
Resolves conflict in auth/index.ts KV expirationTtl guard — takes development version with `ttl !== undefined` check and explanatory comment. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Coverage Report for Expo Unit Tests Coverage (./apps/expo)
File CoverageNo changed files found. |
Summary
json-utils.tswith streaming JSON line parser backed by@packrat/guardstype guardsTest plan
json-utils.ts(streaming parse, malformed line handling, type guards)chunk-csv-for-r2tests updated to cover both format paths🤖 Generated with Claude Code