Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-ops-split-activation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Enable the dedicated run-ops database split: run records and their related rows are served from a separate database, with cross-database references resolved in application code instead of database foreign keys.
16 changes: 9 additions & 7 deletions apps/webapp/app/v3/services/bulk/performBulkAction.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ export class PerformBulkActionService extends BaseService {
const item = await this._prisma.bulkActionItem.findFirst({
where: { id: bulkActionItemId },
include: {
group: true,
sourceRun: true,
destinationRun: true,
},
Expand All @@ -24,7 +23,7 @@ export class PerformBulkActionService extends BaseService {
return;
}

switch (item.group.type) {
switch (item.type) {
case "REPLAY": {
const service = new ReplayTaskRunService(this._prisma);
const result = await service.call(item.sourceRun, { triggerSource: "dashboard" });
Expand Down Expand Up @@ -57,7 +56,7 @@ export class PerformBulkActionService extends BaseService {
break;
}
default: {
assertNever(item.group.type);
assertNever(item.type);
}
}

Expand Down Expand Up @@ -94,17 +93,20 @@ export class PerformBulkActionService extends BaseService {

public async call(bulkActionGroupId: string) {
const actionGroup = await this._prisma.bulkActionGroup.findFirst({
include: {
items: true,
},
where: { id: bulkActionGroupId },
select: { id: true },
});

if (!actionGroup) {
return;
}

for (const item of actionGroup.items) {
const items = await this._prisma.bulkActionItem.findMany({
where: { groupId: bulkActionGroupId },
select: { id: true },
});

for (const item of items) {
await this.enqueueBulkActionItem(item.id, bulkActionGroupId);
}
}
Expand Down
169 changes: 169 additions & 0 deletions apps/webapp/test/batchTaskRunEnvironmentFkDrop.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Proof for dropping the canonical BatchTaskRun -> RuntimeEnvironment FK
// (constraint "BatchTaskRun_runtimeEnvironmentId_fkey", onDelete: Cascade) while keeping the
// runtimeEnvironmentId scalar and its compound @@unique/@@index. BatchTaskRun is run-ops and
// RuntimeEnvironment is control-plane, so the two may live on different servers; create-time
// integrity is preserved app-side via the ControlPlaneResolver's assertEnvExists. Env-delete
// orphan cleanup is handled separately — here the batch row is tolerated.

import { heteroPostgresTest, postgresTest } from "@internal/testcontainers";
import type { PrismaClient } from "@trigger.dev/database";
import { describe, expect, vi } from "vitest";
import { ControlPlaneCache } from "~/v3/runOpsMigration/controlPlaneCache.server";
import {
ControlPlaneReferenceError,
ControlPlaneResolver,
} from "~/v3/runOpsMigration/controlPlaneResolver.server";

// Cross-DB testcontainer spin-up + queries can exceed the 5s default on the first test.
vi.setConfig({ testTimeout: 60_000 });

let seedCounter = 0;

async function seedEnvironment(prisma: PrismaClient) {
const n = seedCounter++;
const organization = await prisma.organization.create({
data: { title: `Org ${n}`, slug: `org-${n}` },
});
const project = await prisma.project.create({
data: {
name: `Project ${n}`,
slug: `project-${n}`,
externalRef: `proj_${n}`,
organizationId: organization.id,
},
});
const environment = await prisma.runtimeEnvironment.create({
data: {
type: "PRODUCTION",
slug: `env-${n}`,
projectId: project.id,
organizationId: organization.id,
apiKey: `tr_prod_${n}`,
pkApiKey: `pk_prod_${n}`,
shortcode: `short_${n}`,
},
});
return { organization, project, environment };
}

let batchCounter = 0;

async function createBatch(prisma: PrismaClient, runtimeEnvironmentId: string) {
const n = batchCounter++;
return prisma.batchTaskRun.create({
data: {
friendlyId: `batch_${n}`,
runtimeEnvironmentId,
runCount: 1,
runIds: [],
batchVersion: "runengine:v2",
},
});
}

// Asserts the post-migration state of BatchTaskRun on a given client: the FK is gone, but the
// scalar and both compound constraints are retained. Shared by the single-version and the
// cross-version suites.
async function assertSchemaState(prisma: PrismaClient) {
const foreignKeys = await prisma.$queryRaw<{ constraint_name: string }[]>`
SELECT constraint_name
FROM information_schema.table_constraints
WHERE table_name = 'BatchTaskRun'
AND constraint_type = 'FOREIGN KEY'
`;
expect(foreignKeys.map((c) => c.constraint_name)).not.toContain(
"BatchTaskRun_runtimeEnvironmentId_fkey"
);

const columns = await prisma.$queryRaw<{ column_name: string }[]>`
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'BatchTaskRun'
AND column_name = 'runtimeEnvironmentId'
`;
expect(columns).toHaveLength(1);

// The @@unique([runtimeEnvironmentId, idempotencyKey]) and
// @@index([runtimeEnvironmentId, id(sort: Desc)]) both survive the FK drop.
const indexes = await prisma.$queryRaw<{ indexdef: string }[]>`
SELECT indexdef FROM pg_indexes WHERE tablename = 'BatchTaskRun'
`;
const defs = indexes.map((i) => i.indexdef);
const hasUnique = defs.some(
(d) => /UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && d.includes("idempotencyKey")
);
const hasIndex = defs.some(
(d) => !/UNIQUE/i.test(d) && d.includes("runtimeEnvironmentId") && /\bid\b/.test(d)
);
expect(hasUnique).toBe(true);
expect(hasIndex).toBe(true);
}

// Inserts an env + batch, deletes the env, and asserts the batch survives (cascade gone).
async function assertOrphanTolerated(prisma: PrismaClient) {
const { environment } = await seedEnvironment(prisma);
const batch = await createBatch(prisma, environment.id);

await prisma.runtimeEnvironment.delete({ where: { id: environment.id } });

const survivor = await prisma.batchTaskRun.findFirst({ where: { id: batch.id } });
expect(survivor).not.toBeNull();
expect(survivor?.runtimeEnvironmentId).toBe(environment.id);
}

describe("drop BatchTaskRun -> RuntimeEnvironment FK", () => {
postgresTest("FK constraint absent; scalar + unique + index retained", async ({ prisma }) => {
await assertSchemaState(prisma);
});

postgresTest(
"deleting the env leaves the BatchTaskRun row alive (no cascade; orphan cleanup handled separately)",
async ({ prisma }) => {
await assertOrphanTolerated(prisma);
}
);

postgresTest(
"app-side env validation: assertEnvExists rejects an invalid env and a valid-env create succeeds by scalar",
async ({ prisma }) => {
const { environment } = await seedEnvironment(prisma);

const resolver = new ControlPlaneResolver({
controlPlanePrimary: prisma,
controlPlaneReplica: prisma,
cache: new ControlPlaneCache(),
splitEnabled: () => true,
});

// The exact guard call the create services place before batchTaskRun.create.
await expect(resolver.assertEnvExists("env_does_not_exist")).rejects.toBeInstanceOf(
ControlPlaneReferenceError
);

await expect(resolver.assertEnvExists(environment.id)).resolves.toBeUndefined();

// Once the guard passes, the batch is linked by the runtimeEnvironmentId scalar (no FK).
const batch = await createBatch(prisma, environment.id);
expect(batch.runtimeEnvironmentId).toBe(environment.id);
}
);
});

// Cross-version gate: the migration applies and the post-state is identical across major versions.
describe("drop BatchTaskRun -> RuntimeEnvironment FK — cross-version (legacy + new Postgres)", () => {
heteroPostgresTest(
"migration applies and FK is absent on both the legacy and new databases",
async ({ prisma14, prisma17 }) => {
await assertSchemaState(prisma14);
await assertSchemaState(prisma17);
}
);

heteroPostgresTest(
"env delete leaves the batch orphaned on both the legacy and new databases",
async ({ prisma14, prisma17 }) => {
await assertOrphanTolerated(prisma14);
await assertOrphanTolerated(prisma17);
}
);
});
64 changes: 64 additions & 0 deletions apps/webapp/test/dropTaskRunToTaskRunTagJoin.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Single-version proof for dropping the dead `_TaskRunToTaskRunTag` implicit join.

import { describe, expect } from "vitest";
import { postgresTest } from "@internal/testcontainers";

describe("drop _TaskRunToTaskRunTag implicit join", () => {
postgresTest("runTags scalar round-trips and the join table is gone", async ({ prisma }) => {
const organization = await prisma.organization.create({
data: {
title: "test",
slug: "test",
},
});

const project = await prisma.project.create({
data: {
name: "test",
slug: "test",
organizationId: organization.id,
externalRef: "test",
},
});

const runtimeEnvironment = await prisma.runtimeEnvironment.create({
data: {
slug: "test",
type: "DEVELOPMENT",
projectId: project.id,
organizationId: organization.id,
apiKey: "test",
pkApiKey: "test",
shortcode: "test",
},
});

const taskRun = await prisma.taskRun.create({
data: {
friendlyId: "run_1234",
taskIdentifier: "my-task",
payload: JSON.stringify({ foo: "bar" }),
payloadType: "application/json",
traceId: "1234",
spanId: "1234",
queue: "test",
runtimeEnvironmentId: runtimeEnvironment.id,
projectId: project.id,
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
runTags: ["alpha", "beta"],
},
});

const readBack = await prisma.taskRun.findFirstOrThrow({
where: { id: taskRun.id },
});
expect(readBack.runTags).toEqual(["alpha", "beta"]);

const result = await prisma.$queryRaw<{ t: string | null }[]>`
SELECT to_regclass('public._TaskRunToTaskRunTag')::text as t
`;
expect(result[0].t).toBeNull();
});
});
Loading
Loading