From 9b0768638669df7c7db2c78d30aa268d5850ab6f Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Wed, 1 Jul 2026 16:53:54 +0100 Subject: [PATCH 01/19] feat: add bulk replay to API and SDK --- .changeset/bulk-actions-sdk-api.md | 6 + .server-changes/bulk-actions-api-sdk.md | 6 + .../v3/ApiBulkActionPresenter.server.ts | 171 ++++++++++++++++++ ...api.v1.bulk-actions.$bulkActionId.abort.ts | 44 +++++ .../api.v1.bulk-actions.$bulkActionId.ts | 39 ++++ apps/webapp/app/routes/api.v1.bulk-actions.ts | 133 ++++++++++++++ ...ectParam.env.$envParam.runs.bulkaction.tsx | 28 ++- .../v3/services/bulk/BulkActionV2.server.ts | 125 +++++++------ packages/core/src/v3/apiClient/index.ts | 67 ++++++- packages/core/src/v3/apiClient/types.ts | 28 +++ packages/core/src/v3/schemas/api.ts | 82 +++++++++ packages/trigger-sdk/src/v3/index.ts | 1 + packages/trigger-sdk/src/v3/runs.ts | 150 +++++++++++++++ 13 files changed, 819 insertions(+), 61 deletions(-) create mode 100644 .changeset/bulk-actions-sdk-api.md create mode 100644 .server-changes/bulk-actions-api-sdk.md create mode 100644 apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts create mode 100644 apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts create mode 100644 apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts create mode 100644 apps/webapp/app/routes/api.v1.bulk-actions.ts diff --git a/.changeset/bulk-actions-sdk-api.md b/.changeset/bulk-actions-sdk-api.md new file mode 100644 index 00000000000..e62f8dcc4c2 --- /dev/null +++ b/.changeset/bulk-actions-sdk-api.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Add SDK and API client helpers for run bulk actions. diff --git a/.server-changes/bulk-actions-api-sdk.md b/.server-changes/bulk-actions-api-sdk.md new file mode 100644 index 00000000000..01da3859cbd --- /dev/null +++ b/.server-changes/bulk-actions-api-sdk.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add API and SDK support for creating, listing, retrieving, polling, and aborting run bulk actions. diff --git a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts new file mode 100644 index 00000000000..badc31bf322 --- /dev/null +++ b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts @@ -0,0 +1,171 @@ +import { + type BulkActionGroup, + type BulkActionStatus, + type BulkActionType, +} from "@trigger.dev/database"; +import { z } from "zod"; +import { BasePresenter } from "./basePresenter.server"; + +const DEFAULT_PAGE_SIZE = 25; +const MAX_PAGE_SIZE = 100; + +export const ApiBulkActionListSearchParams = z.object({ + "page[size]": z.coerce.number().int().positive().min(1).max(MAX_PAGE_SIZE).optional(), + "page[after]": z.string().optional(), + "page[before]": z.string().optional(), +}); + +export type ApiBulkActionListSearchParams = z.infer; + +type BulkActionListCursor = { + createdAt: Date; + id: string; +}; + +type BulkActionRow = Pick< + BulkActionGroup, + | "id" + | "friendlyId" + | "name" + | "status" + | "type" + | "createdAt" + | "completedAt" + | "totalCount" + | "successCount" + | "failureCount" +>; + +export class ApiBulkActionPresenter extends BasePresenter { + public async retrieve(environmentId: string, bulkActionId: string) { + const bulkAction = await this._replica.bulkActionGroup.findFirst({ + select: bulkActionSelect, + where: { + environmentId, + friendlyId: bulkActionId, + }, + }); + + if (!bulkAction) { + return undefined; + } + + return apiBulkActionObject(bulkAction); + } + + public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) { + const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE; + const after = searchParams["page[after]"]; + const before = searchParams["page[before]"]; + + if (after && before) { + throw new Error("Only one of page[after] or page[before] can be provided"); + } + + const cursor = decodeCursor(after ?? before); + const direction = before ? "backward" : "forward"; + + const where = { + environmentId, + ...(cursor + ? direction === "forward" + ? { + OR: [ + { createdAt: { lt: cursor.createdAt } }, + { createdAt: cursor.createdAt, id: { lt: cursor.id } }, + ], + } + : { + OR: [ + { createdAt: { gt: cursor.createdAt } }, + { createdAt: cursor.createdAt, id: { gt: cursor.id } }, + ], + } + : {}), + }; + + const rows = await this._replica.bulkActionGroup.findMany({ + select: bulkActionSelect, + where, + orderBy: + direction === "forward" + ? [{ createdAt: "desc" }, { id: "desc" }] + : [{ createdAt: "asc" }, { id: "asc" }], + take: pageSize + 1, + }); + + const hasMore = rows.length > pageSize; + const pageRows = rows.slice(0, pageSize); + const dataRows = direction === "forward" ? pageRows : [...pageRows].reverse(); + + const first = dataRows.at(0); + const last = dataRows.at(-1); + + return { + data: dataRows.map(apiBulkActionObject), + pagination: { + next: last && (hasMore || direction === "backward") ? encodeCursor(last) : undefined, + previous: + first && + ((direction === "forward" && Boolean(after)) || (direction === "backward" && hasMore)) + ? encodeCursor(first) + : undefined, + }, + }; + } +} + +const bulkActionSelect = { + id: true, + friendlyId: true, + name: true, + status: true, + type: true, + createdAt: true, + completedAt: true, + totalCount: true, + successCount: true, + failureCount: true, +} as const; + +export function apiBulkActionObject(row: BulkActionRow) { + return { + id: row.friendlyId, + name: row.name ?? undefined, + type: row.type as BulkActionType, + status: row.status as BulkActionStatus, + counts: { + total: row.totalCount, + success: row.successCount, + failure: row.failureCount, + }, + createdAt: row.createdAt, + completedAt: row.completedAt ?? undefined, + }; +} + +function encodeCursor(row: Pick) { + return Buffer.from(JSON.stringify({ createdAt: row.createdAt.getTime(), id: row.id })).toString( + "base64url" + ); +} + +function decodeCursor(cursor: string | undefined): BulkActionListCursor | undefined { + if (!cursor) { + return undefined; + } + + try { + const parsed = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) as { + createdAt?: unknown; + id?: unknown; + }; + if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") { + throw new Error("Invalid cursor"); + } + + return { createdAt: new Date(parsed.createdAt), id: parsed.id }; + } catch { + throw new Error("Invalid cursor"); + } +} diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts new file mode 100644 index 00000000000..dcb7c823867 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -0,0 +1,44 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; + +const ParamsSchema = z.object({ + bulkActionId: z.string(), +}); + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + corsStrategy: "none", + authorization: { + action: "write", + resource: () => ({ type: "runs" }), + }, + findResource: async (params, auth) => { + return $replica.bulkActionGroup.findFirst({ + select: { id: true }, + where: { + friendlyId: params.bulkActionId, + environmentId: auth.environment.id, + }, + }); + }, + }, + async ({ params, authentication }) => { + const service = new BulkActionService(); + + try { + const result = await service.abort(params.bulkActionId, authentication.environment.id); + return json({ id: result.bulkActionId }); + } catch (error) { + return json( + { error: error instanceof Error ? error.message : "Failed to abort bulk action" }, + { status: 400 } + ); + } + } +); + +export { action }; diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts new file mode 100644 index 00000000000..0215f3fa221 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts @@ -0,0 +1,39 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { ApiBulkActionPresenter } from "~/presenters/v3/ApiBulkActionPresenter.server"; +import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + bulkActionId: z.string(), +}); + +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + corsStrategy: "none", + authorization: { + action: "read", + resource: () => ({ type: "runs" }), + }, + findResource: async (params, auth) => { + return $replica.bulkActionGroup.findFirst({ + select: { id: true }, + where: { + friendlyId: params.bulkActionId, + environmentId: auth.environment.id, + }, + }); + }, + }, + async ({ params, authentication }) => { + const presenter = new ApiBulkActionPresenter(); + const bulkAction = await presenter.retrieve(authentication.environment.id, params.bulkActionId); + + if (!bulkAction) { + return json({ error: "Bulk action not found" }, { status: 404 }); + } + + return json(bulkAction); + } +); diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts new file mode 100644 index 00000000000..ab632fb5ede --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -0,0 +1,133 @@ +import { json } from "@remix-run/server-runtime"; +import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3"; +import { z } from "zod"; +import { + ApiBulkActionListSearchParams, + ApiBulkActionPresenter, +} from "~/presenters/v3/ApiBulkActionPresenter.server"; +import { ApiRunListPresenter } from "~/presenters/v3/ApiRunListPresenter.server"; +import { logger } from "~/services/logger.server"; +import type { RunListInputFilters } from "~/services/runsRepository/runsRepository.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; +import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +const MAX_CREATE_BODY_SIZE = 1024 * 1024; + +const { action } = createActionApiRoute( + { + body: CreateBulkActionRequestBody, + maxContentLength: MAX_CREATE_BODY_SIZE, + corsStrategy: "none", + authorization: { + action: "write", + resource: () => ({ type: "runs" }), + }, + }, + async ({ body, authentication }) => { + if (!body) { + return json({ error: "Invalid request body" }, { status: 400 }); + } + + const service = new BulkActionService(); + + try { + const result = await service.create({ + organizationId: authentication.environment.organizationId, + projectId: authentication.environment.projectId, + environmentId: authentication.environment.id, + userId: authentication.actor?.sub ?? null, + action: body.action, + title: body.name, + region: body.region, + emailNotification: body.emailNotification, + filters: body.runIds + ? { runId: body.runIds } + : bulkActionFilterToRunListFilters(body.filter), + triggerSource: "api", + }); + + return json({ id: result.bulkActionId }, { status: 202 }); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 400 }); + } + + logger.error("Failed to create API bulk action", { error }); + return json( + { error: error instanceof Error ? error.message : "Failed to create bulk action" }, + { status: 500 } + ); + } + } +); + +const loader = createLoaderApiRoute( + { + searchParams: ApiBulkActionListSearchParams, + corsStrategy: "none", + authorization: { + action: "read", + resource: () => ({ type: "runs" }), + }, + findResource: async () => 1, + }, + async ({ searchParams, authentication }) => { + const presenter = new ApiBulkActionPresenter(); + const result = await presenter.list(authentication.environment.id, searchParams); + return json(result); + } +); + +export { action, loader }; + +function bulkActionFilterToRunListFilters( + filter: z.infer["filter"] +): RunListInputFilters { + if (!filter) { + return {}; + } + + const filters: RunListInputFilters = {}; + + if (filter.status) { + filters.statuses = asArray(filter.status).flatMap((status) => + ApiRunListPresenter.apiStatusToRunStatuses(status) + ); + } + + if (filter.taskIdentifier) filters.tasks = asArray(filter.taskIdentifier); + if (filter.version) filters.versions = asArray(filter.version); + if (filter.tag) filters.tags = asArray(filter.tag); + if (filter.bulkAction) filters.bulkId = filter.bulkAction; + if (filter.schedule) filters.scheduleId = filter.schedule; + if (filter.isTest !== undefined) filters.isTest = filter.isTest; + if (filter.from !== undefined) filters.from = dateOrNumberToMs(filter.from); + if (filter.to !== undefined) filters.to = dateOrNumberToMs(filter.to); + if (filter.period) filters.period = filter.period; + if (filter.batch) filters.batchId = filter.batch; + if (filter.queue) filters.queues = asArray(filter.queue).map(queueNameFromQueueTypeName); + if (filter.machine) filters.machines = asArray(filter.machine); + if (filter.region) filters.regions = asArray(filter.region); + + return filters; +} + +function asArray(value: T | T[]): T[] { + return Array.isArray(value) ? value : [value]; +} + +function dateOrNumberToMs(value: Date | number): number { + return value instanceof Date ? value.getTime() : value; +} + +function queueNameFromQueueTypeName(queue: QueueTypeName): string { + if (queue.type === "task") { + return `task/${queue.name}`; + } + + return queue.name; +} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx index 59c6da64126..5964947d816 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx @@ -51,6 +51,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { resolveOrgIdFromSlug } from "~/models/organization.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; import { CreateBulkActionPresenter } from "~/presenters/v3/CreateBulkActionPresenter.server"; import { RegionsPresenter } from "~/presenters/v3/RegionsPresenter.server"; import { RUNS_BULK_INSPECTOR_UI_SEARCH_PARAMS } from "~/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/shouldRevalidateRunsList"; @@ -188,14 +189,25 @@ export const action = dashboardAction( const service = new BulkActionService(); const [error, result] = await tryCatch( - service.create( - project.organizationId, - project.id, - environment.id, - user.id, - submission.value, - request - ) + (async () => { + const filters = + submission.value.mode === "selected" + ? { runId: submission.value.selectedRunIds } + : await getRunFiltersFromRequest(request); + + return service.create({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + userId: user.id, + action: submission.value.action, + title: submission.value.title, + region: submission.value.region, + emailNotification: submission.value.emailNotification, + filters, + triggerSource: "dashboard", + }); + })() ); if (error) { diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index d03ab71796f..ad15383324d 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -5,8 +5,6 @@ import { BulkActionType, type PrismaClient, } from "@trigger.dev/database"; -import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; -import { type CreateBulkActionPayload } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { parseRunListInputOptions, @@ -14,18 +12,33 @@ import { RunsRepository, } from "~/services/runsRepository/runsRepository.server"; import { BaseService } from "../baseService.server"; +import { ServiceValidationError } from "../common.server"; import { commonWorker } from "~/v3/commonWorker.server"; import { env } from "~/env.server"; import { logger } from "@trigger.dev/sdk"; import { CancelTaskRunService } from "../cancelTaskRun.server"; import { tryCatch } from "@trigger.dev/core"; import { ReplayTaskRunService } from "../replayTaskRun.server"; +import { WorkerGroupService } from "../worker/workerGroupService.server"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import parseDuration from "parse-duration"; import { v3BulkActionPath } from "~/utils/pathBuilder"; import { formatDateTime } from "~/components/primitives/DateTime"; import pMap from "p-map"; +export type CreateBulkActionInput = { + organizationId: string; + projectId: string; + environmentId: string; + userId?: string | null; + action: "cancel" | "replay"; + filters: RunListInputFilters; + title?: string; + region?: string; + emailNotification?: boolean; + triggerSource?: string; +}; + export type ProcessToCompletionOptions = { /** Absolute timestamp (ms) after which processing stops and returns incomplete. */ deadline?: number; @@ -36,25 +49,36 @@ export type ProcessToCompletionResult = { }; export class BulkActionService extends BaseService { - public async create( - organizationId: string, - projectId: string, - environmentId: string, - userId: string, - payload: CreateBulkActionPayload, - request: Request - ) { - const filters = await getFilters(payload, request); + public async create(input: CreateBulkActionInput) { + const filters = freezeRunListFilters(input.filters); // Region is a replay-only override that re-routes the replayed runs. It's // stored alongside the run-list filters under a dedicated key so it isn't // mistaken for a `regions` selection filter when the params are parsed. - const replayRegion = payload.action === "replay" ? payload.region : undefined; - const params = replayRegion ? { ...filters, replayRegion } : filters; + const replayRegion = input.action === "replay" ? input.region : undefined; + if (replayRegion) { + // Validating the region override up-front so an invalid/unauthorized + // region surfaces as a user-input (400) error rather than a 500. + const [regionError] = await tryCatch( + new WorkerGroupService({ prisma: this._prisma }).getDefaultWorkerGroupForProject({ + projectId: input.projectId, + regionOverride: replayRegion, + }) + ); + if (regionError) { + throw new ServiceValidationError(regionError.message, 400); + } + } + + const params = { + ...filters, + ...(replayRegion ? { replayRegion } : {}), + ...(input.triggerSource ? { triggerSource: input.triggerSource } : {}), + }; // Count the runs that will be affected by the bulk action const clickhouse = await clickhouseFactory.getClickhouseForOrganization( - organizationId, + input.organizationId, "standard" ); const runsRepository = new RunsRepository({ @@ -62,9 +86,9 @@ export class BulkActionService extends BaseService { prisma: this._replica as PrismaClient, }); const count = await runsRepository.countRuns({ - organizationId, - projectId, - environmentId, + organizationId: input.organizationId, + projectId: input.projectId, + environmentId: input.environmentId, ...filters, }); @@ -74,16 +98,16 @@ export class BulkActionService extends BaseService { data: { id, friendlyId, - projectId, - environmentId, - userId, - name: payload.title, - type: payload.action === "cancel" ? BulkActionType.CANCEL : BulkActionType.REPLAY, + projectId: input.projectId, + environmentId: input.environmentId, + userId: input.userId, + name: input.title, + type: input.action === "cancel" ? BulkActionType.CANCEL : BulkActionType.REPLAY, params, queryName: "bulk_action_v1", totalCount: count, completionNotification: - payload.emailNotification === true + input.emailNotification === true ? BulkActionNotificationType.EMAIL : BulkActionNotificationType.NONE, }, @@ -202,6 +226,10 @@ export class BulkActionService extends BaseService { "replayRegion" in rawParams && typeof (rawParams as any).replayRegion === "string" ? (rawParams as any).replayRegion : undefined; + const triggerSource = + "triggerSource" in rawParams && typeof (rawParams as any).triggerSource === "string" + ? (rawParams as any).triggerSource + : "dashboard"; const filters = parseRunListInputOptions({ organizationId: group.project.organizationId, projectId: group.projectId, @@ -317,7 +345,7 @@ export class BulkActionService extends BaseService { const [error, result] = await tryCatch( replayService.call(run, { bulkActionId: bulkActionId, - triggerSource: "dashboard", + triggerSource, region: replayRegion, }) ); @@ -479,28 +507,21 @@ export class BulkActionService extends BaseService { } } -async function getFilters( - payload: CreateBulkActionPayload, - request: Request -): Promise { - if (payload.mode === "selected") { - return { - runId: payload.selectedRunIds, - }; +export function freezeRunListFilters(filters: RunListInputFilters): RunListInputFilters { + const frozenFilters: RunListInputFilters = { ...filters }; + delete (frozenFilters as any).cursor; + delete (frozenFilters as any).direction; + + // Explicit run-id selections target specific, already-existing runs, so we + // don't apply a time bound (which could otherwise exclude a selected run). + if (frozenFilters.runId?.length) { + return frozenFilters; } - const filters = await getRunFiltersFromRequest(request); - filters.cursor = undefined; - filters.direction = undefined; - - const { - period, - from: _from, - to: _to, - } = timeFilters({ - period: filters.period, - from: filters.from, - to: filters.to, + const { period } = timeFilters({ + period: frozenFilters.period, + from: frozenFilters.from, + to: frozenFilters.to, }); // We fix the time period to a from/to date @@ -512,18 +533,18 @@ async function getFilters( const to = new Date(); const from = new Date(to.getTime() - periodMs); - filters.from = from.getTime(); - filters.to = to.getTime(); - filters.period = undefined; - return filters; + frozenFilters.from = from.getTime(); + frozenFilters.to = to.getTime(); + frozenFilters.period = undefined; + return frozenFilters; } // If no to date is set, we lock it to now - if (!filters.to) { - filters.to = Date.now(); + if (!frozenFilters.to) { + frozenFilters.to = Date.now(); } - filters.period = undefined; + frozenFilters.period = undefined; - return filters; + return frozenFilters; } diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 3cdc78e1b31..a0732cd1580 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -36,7 +36,10 @@ import { type UpdateScheduleOptions, type UpdateSessionRequestBody, type WaitForDurationRequestBody, + AbortBulkActionResponseBody, ApiDeploymentListResponseItem, + BulkActionObject, + CreateBulkActionResponseBody, AppendToStreamResponseBody, BatchTaskRunExecutionResult, BatchTriggerTaskV3Response, @@ -117,9 +120,11 @@ import { runShapeStream, type SSEStreamPart, } from "./runStream.js"; -import type { +import { + CreateBulkActionOptions, CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, + ListBulkActionsQueryParams, ListProjectRunsQueryParams, ListRunsQueryParams, ListWaitpointTokensQueryParams, @@ -141,9 +146,11 @@ export type CreateBatchApiResponse = Prettify< >; export type { + CreateBulkActionOptions, CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, RealtimeRunSkipColumns, + ListBulkActionsQueryParams, SubscribeToRunsQueryParams, UpdateEnvironmentVariableParams, }; @@ -738,6 +745,64 @@ export class ApiClient { ); } + createBulkAction(options: CreateBulkActionOptions, requestOptions?: ZodFetchOptions) { + return zodfetch( + CreateBulkActionResponseBody, + `${this.baseUrl}/api/v1/bulk-actions`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify(options), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + listBulkActions( + query?: ListBulkActionsQueryParams, + requestOptions?: ZodFetchOptions + ): CursorPagePromise { + return zodfetchCursorPage( + BulkActionObject, + `${this.baseUrl}/api/v1/bulk-actions`, + { + query: new URLSearchParams(), + limit: query?.limit, + after: query?.after, + before: query?.before, + }, + { + method: "GET", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + retrieveBulkAction(bulkActionId: string, requestOptions?: ZodFetchOptions) { + return zodfetch( + BulkActionObject, + `${this.baseUrl}/api/v1/bulk-actions/${bulkActionId}`, + { + method: "GET", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + abortBulkAction(bulkActionId: string, requestOptions?: ZodFetchOptions) { + return zodfetch( + AbortBulkActionResponseBody, + `${this.baseUrl}/api/v1/bulk-actions/${bulkActionId}/abort`, + { + method: "POST", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + resetIdempotencyKey( taskIdentifier: string, idempotencyKey: string, diff --git a/packages/core/src/v3/apiClient/types.ts b/packages/core/src/v3/apiClient/types.ts index d3ee6427032..f6efa6ece35 100644 --- a/packages/core/src/v3/apiClient/types.ts +++ b/packages/core/src/v3/apiClient/types.ts @@ -70,6 +70,34 @@ export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQu env?: Array<"dev" | "staging" | "prod"> | "dev" | "staging" | "prod"; } +export type BulkActionFilter = Omit; + +export type BulkActionSelection = + | { filter: BulkActionFilter; runIds?: never } + | { runIds: string[]; filter?: never }; + +export type CreateBulkActionOptions = BulkActionSelection & { + action: "cancel" | "replay"; + name?: string; + /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ + region?: string; + emailNotification?: boolean; +}; + +export type CreateBulkCancelActionOptions = BulkActionSelection & { + name?: string; + emailNotification?: boolean; +}; + +export type CreateBulkReplayActionOptions = BulkActionSelection & { + name?: string; + /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ + region?: string; + emailNotification?: boolean; +}; + +export type ListBulkActionsQueryParams = CursorPageParams; + export interface SubscribeToRunsQueryParams { tasks?: Array | string; tags?: Array | string; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index e9b5b03cc64..0e0047c3ea1 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -9,6 +9,7 @@ import { } from "./common.js"; import { BackgroundWorkerMetadata } from "./resources.js"; import { DequeuedMessage, MachineResources } from "./runEngine.js"; +import { QueueTypeName } from "./queues.js"; export const RunEngineVersion = z.union([z.literal("V1"), z.literal("V2")]); @@ -1223,6 +1224,87 @@ export const ListRunResponse = z.object({ export type ListRunResponse = z.infer; +const StringOrStringArray = z.union([z.string(), z.array(z.string())]); +const MachineOrMachineArray = z.union([MachinePresetName, z.array(MachinePresetName)]); +const QueueOrQueueArray = z.union([QueueTypeName, z.array(QueueTypeName)]); +const DateOrNumber = z.union([z.coerce.date(), z.number()]); + +const BulkActionFilterRequestBody = z.object({ + status: z.union([RunStatus, z.array(RunStatus)]).optional(), + taskIdentifier: StringOrStringArray.optional(), + version: StringOrStringArray.optional(), + from: DateOrNumber.optional(), + to: DateOrNumber.optional(), + period: z.string().optional(), + bulkAction: z.string().optional(), + tag: StringOrStringArray.optional(), + schedule: z.string().optional(), + isTest: z.boolean().optional(), + batch: z.string().optional(), + queue: QueueOrQueueArray.optional(), + machine: MachineOrMachineArray.optional(), + region: StringOrStringArray.optional(), +}); + +export const CreateBulkActionRequestBody = z + .object({ + action: z.enum(["cancel", "replay"]), + filter: BulkActionFilterRequestBody.optional(), + runIds: z.array(z.string()).min(1).optional(), + name: z.string().optional(), + region: z.string().optional(), + emailNotification: z.boolean().optional(), + }) + .refine((body) => (body.filter ? 1 : 0) + (body.runIds ? 1 : 0) === 1, { + message: "Exactly one of filter or runIds must be provided", + }); + +export type CreateBulkActionRequestBody = z.infer; + +export const BulkActionStatus = z.enum(["PENDING", "COMPLETED", "ABORTED"]); +export type BulkActionStatus = z.infer; + +export const BulkActionType = z.enum(["CANCEL", "REPLAY"]); +export type BulkActionType = z.infer; + +export const BulkActionObject = z.object({ + id: z.string(), + name: z.string().optional(), + type: BulkActionType, + status: BulkActionStatus, + counts: z.object({ + total: z.number(), + success: z.number(), + failure: z.number(), + }), + createdAt: z.coerce.date(), + completedAt: z.coerce.date().optional(), +}); + +export type BulkActionObject = z.infer; + +export const CreateBulkActionResponseBody = z.object({ + id: z.string(), +}); + +export type CreateBulkActionResponseBody = z.infer; + +export const AbortBulkActionResponseBody = z.object({ + id: z.string(), +}); + +export type AbortBulkActionResponseBody = z.infer; + +export const ListBulkActionsResponseBody = z.object({ + data: z.array(BulkActionObject), + pagination: z.object({ + next: z.string().optional(), + previous: z.string().optional(), + }), +}); + +export type ListBulkActionsResponseBody = z.infer; + export const CreateEnvironmentVariableRequestBody = z.object({ name: z.string(), value: z.string(), diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index f993105f0bd..4bdb582d7ae 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -55,6 +55,7 @@ export { type AnyRealtimeRun, type RetrieveRunResult, type AnyRetrieveRunResult, + type BulkAction, } from "./runs.js"; export * as schedules from "./schedules/index.js"; export { diff --git a/packages/trigger-sdk/src/v3/runs.ts b/packages/trigger-sdk/src/v3/runs.ts index 1ac9582df20..4a2808b583a 100644 --- a/packages/trigger-sdk/src/v3/runs.ts +++ b/packages/trigger-sdk/src/v3/runs.ts @@ -2,7 +2,10 @@ import type { AnyRetrieveRunResult, AnyRunShape, ApiRequestOptions, + CreateBulkCancelActionOptions, + CreateBulkReplayActionOptions, InferRunTypes, + ListBulkActionsQueryParams, ListProjectRunsQueryParams, ListRunsQueryParams, RescheduleRunRequestBody, @@ -16,7 +19,12 @@ import type { AsyncIterableStream, ApiPromise, RealtimeRunSkipColumns, +} from "@trigger.dev/core/v3"; +import { + AbortBulkActionResponseBody, + BulkActionObject, CanceledRunResponse, + CreateBulkActionResponseBody, CursorPagePromise, ListRunResponseItem, ReplayRunResponse, @@ -49,6 +57,14 @@ export const runs = { retrieve: retrieveRun, list: listRuns, reschedule: rescheduleRun, + bulk: { + cancel: bulkCancelRuns, + replay: bulkReplayRuns, + retrieve: retrieveBulkAction, + abort: abortBulkAction, + list: listBulkActions, + poll: pollBulkAction, + }, poll, subscribeToRun, subscribeToRunsWithTag, @@ -57,6 +73,7 @@ export const runs = { }; export type ListRunsItem = ListRunResponseItem; +export type BulkAction = BulkActionObject; function listRuns( projectRef: string, @@ -278,6 +295,139 @@ function cancelRun( return apiClient.cancelRun(runId, $requestOptions); } +function bulkCancelRuns( + options: CreateBulkCancelActionOptions, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.cancel()", + icon: "runs", + attributes: { + ...flattenAttributes(options as Record, "bulkAction"), + }, + }, + requestOptions + ); + + return apiClient.createBulkAction({ ...options, action: "cancel" }, $requestOptions); +} + +function bulkReplayRuns( + options: CreateBulkReplayActionOptions, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.replay()", + icon: "runs", + attributes: { + ...flattenAttributes(options as Record, "bulkAction"), + }, + }, + requestOptions + ); + + return apiClient.createBulkAction({ ...options, action: "replay" }, $requestOptions); +} + +function retrieveBulkAction( + bulkActionId: string, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.retrieve()", + icon: "runs", + attributes: { + bulkActionId, + ...accessoryAttributes({ + items: [{ text: bulkActionId, variant: "normal" }], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.retrieveBulkAction(bulkActionId, $requestOptions); +} + +function abortBulkAction( + bulkActionId: string, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.abort()", + icon: "runs", + attributes: { + bulkActionId, + ...accessoryAttributes({ + items: [{ text: bulkActionId, variant: "normal" }], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.abortBulkAction(bulkActionId, $requestOptions); +} + +function listBulkActions( + params?: ListBulkActionsQueryParams, + requestOptions?: ApiRequestOptions +): CursorPagePromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.list()", + icon: "runs", + attributes: { + ...flattenAttributes(params as Record, "queryParams"), + }, + }, + requestOptions + ); + + return apiClient.listBulkActions(params, $requestOptions); +} + +async function pollBulkAction( + bulkActionId: string, + options?: { pollIntervalMs?: number }, + requestOptions?: ApiRequestOptions +): Promise { + let attempts = 0; + + while (attempts++ < MAX_POLL_ATTEMPTS) { + const bulkAction = await retrieveBulkAction(bulkActionId, requestOptions); + + if (bulkAction.status !== "PENDING") { + return bulkAction; + } + + await new Promise((resolve) => setTimeout(resolve, options?.pollIntervalMs ?? 1000)); + } + + throw new Error(`Bulk action ${bulkActionId} did not finish after ${MAX_POLL_ATTEMPTS} attempts`); +} + function rescheduleRun( runId: string, body: RescheduleRunRequestBody, From 4a4d68002eaf460fa879be68dac512cf897ebc55 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Wed, 1 Jul 2026 18:07:27 +0100 Subject: [PATCH 02/19] some reads from primary to avoid read-your-write lag --- .../app/presenters/v3/ApiBulkActionPresenter.server.ts | 3 ++- .../app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts | 5 +++-- apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts | 5 +++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts index badc31bf322..24820d3261c 100644 --- a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts @@ -38,7 +38,8 @@ type BulkActionRow = Pick< export class ApiBulkActionPresenter extends BasePresenter { public async retrieve(environmentId: string, bulkActionId: string) { - const bulkAction = await this._replica.bulkActionGroup.findFirst({ + // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. + const bulkAction = await this._prisma.bulkActionGroup.findFirst({ select: bulkActionSelect, where: { environmentId, diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts index dcb7c823867..e435e959706 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { $replica } from "~/db.server"; +import { prisma } from "~/db.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; @@ -17,7 +17,8 @@ const { action } = createActionApiRoute( resource: () => ({ type: "runs" }), }, findResource: async (params, auth) => { - return $replica.bulkActionGroup.findFirst({ + // Read from primary so create -> abort doesn't 404 on replica lag. + return prisma.bulkActionGroup.findFirst({ select: { id: true }, where: { friendlyId: params.bulkActionId, diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts index 0215f3fa221..6c9aa45196c 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { $replica } from "~/db.server"; +import { prisma } from "~/db.server"; import { ApiBulkActionPresenter } from "~/presenters/v3/ApiBulkActionPresenter.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; @@ -17,7 +17,8 @@ export const loader = createLoaderApiRoute( resource: () => ({ type: "runs" }), }, findResource: async (params, auth) => { - return $replica.bulkActionGroup.findFirst({ + // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. + return prisma.bulkActionGroup.findFirst({ select: { id: true }, where: { friendlyId: params.bulkActionId, From e206ae84879254938d07db4c6e719a31fd511740 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Wed, 1 Jul 2026 19:02:39 +0100 Subject: [PATCH 03/19] improve errors --- .../api.v1.bulk-actions.$bulkActionId.abort.ts | 12 ++++++++---- apps/webapp/app/routes/api.v1.bulk-actions.ts | 5 +---- .../app/v3/services/bulk/BulkActionV2.server.ts | 6 +++--- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts index e435e959706..818a867cb0b 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -1,8 +1,10 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; const ParamsSchema = z.object({ bulkActionId: z.string(), @@ -34,10 +36,12 @@ const { action } = createActionApiRoute( const result = await service.abort(params.bulkActionId, authentication.environment.id); return json({ id: result.bulkActionId }); } catch (error) { - return json( - { error: error instanceof Error ? error.message : "Failed to abort bulk action" }, - { status: 400 } - ); + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 400 }); + } + + logger.error("Failed to abort API bulk action", { error }); + return json({ error: "Failed to abort bulk action" }, { status: 500 }); } } ); diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts index ab632fb5ede..192bf1bd8f7 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -57,10 +57,7 @@ const { action } = createActionApiRoute( } logger.error("Failed to create API bulk action", { error }); - return json( - { error: error instanceof Error ? error.message : "Failed to create bulk action" }, - { status: 500 } - ); + return json({ error: "Failed to create bulk action" }, { status: 500 }); } } ); diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index ad15383324d..7c1adfd3957 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -482,15 +482,15 @@ export class BulkActionService extends BaseService { }); if (!group) { - throw new Error(`Bulk action not found: ${friendlyId}`); + throw new ServiceValidationError("Bulk action not found", 404); } if (group.status === BulkActionStatus.COMPLETED) { - throw new Error(`Bulk action group already completed: ${friendlyId}`); + throw new ServiceValidationError("Bulk action is already completed", 409); } if (group.status === BulkActionStatus.ABORTED) { - throw new Error(`Bulk action group already aborted: ${friendlyId}`); + throw new ServiceValidationError("Bulk action is already aborted", 409); } //ack the job (this doesn't guarantee it won't run again) From 58c0dc3b2c7dd45107a1ccba10b6b7843a28d3bf Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Wed, 1 Jul 2026 19:24:44 +0100 Subject: [PATCH 04/19] add tests --- .../test/bulk-actions-api.e2e.full.test.ts | 230 ++++++++++++++++++ .../core/src/v3/apiClient/bulkActions.test.ts | 184 ++++++++++++++ packages/trigger-sdk/src/v3/runs-bulk.test.ts | 183 ++++++++++++++ 3 files changed, 597 insertions(+) create mode 100644 apps/webapp/test/bulk-actions-api.e2e.full.test.ts create mode 100644 packages/core/src/v3/apiClient/bulkActions.test.ts create mode 100644 packages/trigger-sdk/src/v3/runs-bulk.test.ts diff --git a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts new file mode 100644 index 00000000000..0470172984f --- /dev/null +++ b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts @@ -0,0 +1,230 @@ +import { randomBytes } from "node:crypto"; +import { + BulkActionStatus, + BulkActionType, + type PrismaClient, + type Project, + type RuntimeEnvironment, +} from "@trigger.dev/database"; +import { describe, expect, it } from "vitest"; +import { getTestServer } from "./helpers/sharedTestServer"; +import { seedTestEnvironment } from "./helpers/seedTestEnvironment"; + +describe("Bulk actions API", () => { + it("lists bulk actions with cursor pagination", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + + const oldest = await seedBulkAction(server.prisma, project, environment, { + name: "Oldest", + createdAt: new Date("2026-07-01T10:00:00.000Z"), + }); + const middle = await seedBulkAction(server.prisma, project, environment, { + name: "Middle", + createdAt: new Date("2026-07-01T10:01:00.000Z"), + }); + const latest = await seedBulkAction(server.prisma, project, environment, { + name: "Latest", + createdAt: new Date("2026-07-01T10:02:00.000Z"), + }); + + const firstResponse = await server.webapp.fetch("/api/v1/bulk-actions?page[size]=2", { + headers: authHeaders(apiKey), + }); + expect(firstResponse.status).toBe(200); + const firstPage = await firstResponse.json(); + expect(firstPage.data.map((item: { id: string }) => item.id)).toEqual([ + latest.friendlyId, + middle.friendlyId, + ]); + expect(firstPage.pagination.next).toEqual(expect.any(String)); + expect(firstPage.pagination.previous).toBeUndefined(); + + const secondResponse = await server.webapp.fetch( + `/api/v1/bulk-actions?page[size]=2&page[after]=${encodeURIComponent( + firstPage.pagination.next + )}`, + { headers: authHeaders(apiKey) } + ); + expect(secondResponse.status).toBe(200); + const secondPage = await secondResponse.json(); + expect(secondPage.data.map((item: { id: string }) => item.id)).toEqual([oldest.friendlyId]); + expect(secondPage.pagination.next).toBeUndefined(); + expect(secondPage.pagination.previous).toEqual(expect.any(String)); + + const previousResponse = await server.webapp.fetch( + `/api/v1/bulk-actions?page[size]=2&page[before]=${encodeURIComponent( + secondPage.pagination.previous + )}`, + { headers: authHeaders(apiKey) } + ); + expect(previousResponse.status).toBe(200); + const previousPage = await previousResponse.json(); + expect(previousPage.data.map((item: { id: string }) => item.id)).toEqual([ + latest.friendlyId, + middle.friendlyId, + ]); + }); + + it("retrieves a bulk action in the authenticated environment", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + name: "Retrieve me", + type: BulkActionType.REPLAY, + status: BulkActionStatus.COMPLETED, + totalCount: 4, + successCount: 3, + failureCount: 1, + completedAt: new Date("2026-07-01T10:05:00.000Z"), + }); + + const response = await server.webapp.fetch(`/api/v1/bulk-actions/${bulkAction.friendlyId}`, { + headers: authHeaders(apiKey), + }); + + expect(response.status).toBe(200); + const body = await response.json(); + expect(body).toMatchObject({ + id: bulkAction.friendlyId, + name: "Retrieve me", + type: "REPLAY", + status: "COMPLETED", + counts: { total: 4, success: 3, failure: 1 }, + }); + expect(body.createdAt).toEqual(expect.any(String)); + expect(body.completedAt).toEqual("2026-07-01T10:05:00.000Z"); + }); + + it("does not retrieve bulk actions from another environment", async () => { + const server = getTestServer(); + const a = await seedTestEnvironment(server.prisma); + const b = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, a.project, a.environment, { + name: "Other environment", + }); + + const response = await server.webapp.fetch(`/api/v1/bulk-actions/${bulkAction.friendlyId}`, { + headers: authHeaders(b.apiKey), + }); + + expect(response.status).toBe(404); + }); + + it("aborts a pending bulk action", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + status: BulkActionStatus.PENDING, + }); + + const response = await server.webapp.fetch( + `/api/v1/bulk-actions/${bulkAction.friendlyId}/abort`, + { method: "POST", headers: authHeaders(apiKey) } + ); + + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ id: bulkAction.friendlyId }); + + const updated = await server.prisma.bulkActionGroup.findUniqueOrThrow({ + where: { id: bulkAction.id }, + select: { status: true }, + }); + expect(updated.status).toBe(BulkActionStatus.ABORTED); + }); + + it("returns a safe validation error when aborting a completed bulk action", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + status: BulkActionStatus.COMPLETED, + completedAt: new Date("2026-07-01T10:05:00.000Z"), + }); + + const response = await server.webapp.fetch( + `/api/v1/bulk-actions/${bulkAction.friendlyId}/abort`, + { method: "POST", headers: authHeaders(apiKey) } + ); + + expect(response.status).toBe(409); + const body = await response.json(); + expect(body).toEqual({ error: "Bulk action is already completed" }); + expect(JSON.stringify(body)).not.toContain(bulkAction.friendlyId); + }); + + it("rejects create requests with both filter and runIds", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", filter: {}, runIds: ["run_123"] }), + }); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain("Exactly one of filter or runIds must be provided"); + }); + + it("returns a generic error for unexpected create failures", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", filter: {}, name: "No ClickHouse in this suite" }), + }); + + expect(response.status).toBe(500); + await expect(response.json()).resolves.toEqual({ error: "Failed to create bulk action" }); + }); +}); + +function authHeaders(apiKey: string) { + return { + Authorization: `Bearer ${apiKey}`, + "Content-Type": "application/json", + }; +} + +async function seedBulkAction( + prisma: PrismaClient, + project: Pick, + environment: Pick, + overrides: { + name?: string; + type?: BulkActionType; + status?: BulkActionStatus; + createdAt?: Date; + completedAt?: Date; + totalCount?: number; + successCount?: number; + failureCount?: number; + } = {} +) { + return prisma.bulkActionGroup.create({ + data: { + friendlyId: `bulk_${randomHex(16)}`, + projectId: project.id, + environmentId: environment.id, + name: overrides.name ?? "Test bulk action", + type: overrides.type ?? BulkActionType.CANCEL, + status: overrides.status ?? BulkActionStatus.PENDING, + queryName: "bulk_action_v1", + params: {}, + totalCount: overrides.totalCount ?? 1, + successCount: overrides.successCount ?? 0, + failureCount: overrides.failureCount ?? 0, + createdAt: overrides.createdAt, + completedAt: overrides.completedAt, + }, + }); +} + +function randomHex(length: number) { + return randomBytes(Math.ceil(length / 2)) + .toString("hex") + .slice(0, length); +} diff --git a/packages/core/src/v3/apiClient/bulkActions.test.ts b/packages/core/src/v3/apiClient/bulkActions.test.ts new file mode 100644 index 00000000000..f764e53ad23 --- /dev/null +++ b/packages/core/src/v3/apiClient/bulkActions.test.ts @@ -0,0 +1,184 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import type { AddressInfo } from "node:net"; +import { ApiClient } from "./index.js"; + +type ReceivedRequest = { + method: string; + url: string; + headers: IncomingMessage["headers"]; + body: string; +}; + +type RequestHandler = (request: ReceivedRequest, response: ServerResponse) => void | Promise; + +describe("ApiClient bulk actions", () => { + let server: Server; + let baseUrl: string; + let receivedRequests: ReceivedRequest[] = []; + let requestHandler: RequestHandler | undefined; + + beforeEach(async () => { + receivedRequests = []; + requestHandler = undefined; + + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (chunk) => chunks.push(chunk)); + req.on("end", async () => { + const received = { + method: req.method ?? "", + url: req.url ?? "", + headers: req.headers, + body: Buffer.concat(chunks).toString(), + } satisfies ReceivedRequest; + receivedRequests.push(received); + + try { + if (requestHandler) { + await requestHandler(received, res); + } else { + json(res, { error: "No handler" }, 500); + } + } catch (error) { + json(res, { error: error instanceof Error ? error.message : String(error) }, 500); + } + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${address.port}`; + resolve(); + }); + }); + }); + + afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("posts the exact create bulk action request body", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_created" }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const result = await client.createBulkAction({ + action: "replay", + filter: { status: ["FAILED"], taskIdentifier: "my-task" }, + name: "Replay failures", + region: "eu_1", + emailNotification: true, + }); + + expect(result).toEqual({ id: "bulk_created" }); + expect(receivedRequests).toHaveLength(1); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(receivedRequests[0]?.headers.authorization).toBe("Bearer tr_test_key"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "replay", + filter: { status: ["FAILED"], taskIdentifier: "my-task" }, + name: "Replay failures", + region: "eu_1", + emailNotification: true, + }); + }); + + it("lists bulk actions with cursor pagination params and parses dates", async () => { + const createdAt = "2026-07-01T10:00:00.000Z"; + const completedAt = "2026-07-01T10:05:00.000Z"; + requestHandler = (_request, response) => + json(response, { + data: [ + { + id: "bulk_listed", + name: "Cancel queued runs", + type: "CANCEL", + status: "COMPLETED", + counts: { total: 3, success: 2, failure: 1 }, + createdAt, + completedAt, + }, + ], + pagination: { next: "cursor_next", previous: "cursor_previous" }, + }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const page = await client.listBulkActions({ limit: 2, after: "cursor_after" }); + + expect(receivedRequests[0]?.method).toBe("GET"); + const url = new URL(receivedRequests[0]?.url ?? "", baseUrl); + expect(url.pathname).toBe("/api/v1/bulk-actions"); + expect(url.searchParams.get("page[size]")).toBe("2"); + expect(url.searchParams.get("page[after]")).toBe("cursor_after"); + expect(page.pagination).toEqual({ next: "cursor_next", previous: "cursor_previous" }); + expect(page.data[0]?.createdAt).toEqual(new Date(createdAt)); + expect(page.data[0]?.completedAt).toEqual(new Date(completedAt)); + }); + + it("auto-paginates bulk action lists", async () => { + requestHandler = (request, response) => { + const url = new URL(request.url, baseUrl); + if (!url.searchParams.has("page[after]")) { + return json(response, { + data: [bulkActionObject("bulk_first")], + pagination: { next: "cursor_next" }, + }); + } + + expect(url.searchParams.get("page[after]")).toBe("cursor_next"); + return json(response, { + data: [bulkActionObject("bulk_second")], + pagination: {}, + }); + }; + + const client = new ApiClient(baseUrl, "tr_test_key"); + const ids: string[] = []; + + for await (const bulkAction of client.listBulkActions({ limit: 1 })) { + ids.push(bulkAction.id); + } + + expect(ids).toEqual(["bulk_first", "bulk_second"]); + expect(receivedRequests).toHaveLength(2); + }); + + it("retrieves a bulk action by id", async () => { + requestHandler = (_request, response) => json(response, bulkActionObject("bulk_retrieve")); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const bulkAction = await client.retrieveBulkAction("bulk_retrieve"); + + expect(receivedRequests[0]?.method).toBe("GET"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions/bulk_retrieve"); + expect(bulkAction.id).toBe("bulk_retrieve"); + }); + + it("aborts a bulk action by id", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_abort" }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const result = await client.abortBulkAction("bulk_abort"); + + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions/bulk_abort/abort"); + expect(result).toEqual({ id: "bulk_abort" }); + }); +}); + +function json(response: ServerResponse, body: unknown, status = 200) { + response.writeHead(status, { "content-type": "application/json" }); + response.end(JSON.stringify(body)); +} + +function bulkActionObject(id: string) { + return { + id, + type: "REPLAY", + status: "PENDING", + counts: { total: 1, success: 0, failure: 0 }, + createdAt: "2026-07-01T10:00:00.000Z", + }; +} diff --git a/packages/trigger-sdk/src/v3/runs-bulk.test.ts b/packages/trigger-sdk/src/v3/runs-bulk.test.ts new file mode 100644 index 00000000000..0a28c9c1b7b --- /dev/null +++ b/packages/trigger-sdk/src/v3/runs-bulk.test.ts @@ -0,0 +1,183 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import type { AddressInfo } from "node:net"; +import { apiClientManager } from "@trigger.dev/core/v3"; +import { runs } from "./runs.js"; + +type ReceivedRequest = { + method: string; + url: string; + headers: IncomingMessage["headers"]; + body: string; +}; + +type RequestHandler = (request: ReceivedRequest, response: ServerResponse) => void | Promise; + +describe("runs.bulk", () => { + let server: Server; + let baseUrl: string; + let receivedRequests: ReceivedRequest[] = []; + let requestHandler: RequestHandler | undefined; + + beforeEach(async () => { + receivedRequests = []; + requestHandler = undefined; + + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (chunk) => chunks.push(chunk)); + req.on("end", async () => { + const received = { + method: req.method ?? "", + url: req.url ?? "", + headers: req.headers, + body: Buffer.concat(chunks).toString(), + } satisfies ReceivedRequest; + receivedRequests.push(received); + + try { + if (requestHandler) { + await requestHandler(received, res); + } else { + json(res, { error: "No handler" }, 500); + } + } catch (error) { + json(res, { error: error instanceof Error ? error.message : String(error) }, 500); + } + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${address.port}`; + resolve(); + }); + }); + }); + + afterEach(async () => { + apiClientManager.disable(); + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("creates a cancel bulk action", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_cancel" }); + + const result = await withApiClient(() => + runs.bulk.cancel({ runIds: ["run_1", "run_2"], name: "Cancel selected" }) + ); + + expect(result).toEqual({ id: "bulk_cancel" }); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "cancel", + runIds: ["run_1", "run_2"], + name: "Cancel selected", + }); + }); + + it("creates a replay bulk action", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_replay" }); + + const result = await withApiClient(() => + runs.bulk.replay({ + filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, + name: "Replay failed tasks", + region: "eu_1", + emailNotification: true, + }) + ); + + expect(result).toEqual({ id: "bulk_replay" }); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "replay", + filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, + name: "Replay failed tasks", + region: "eu_1", + emailNotification: true, + }); + }); + + it("retrieves and aborts bulk actions", async () => { + requestHandler = (request, response) => { + if (request.method === "GET") { + return json(response, bulkActionObject("bulk_read", "PENDING")); + } + + return json(response, { id: "bulk_read" }); + }; + + const retrieved = await withApiClient(() => runs.bulk.retrieve("bulk_read")); + const aborted = await withApiClient(() => runs.bulk.abort("bulk_read")); + + expect(retrieved.id).toBe("bulk_read"); + expect(retrieved.createdAt).toEqual(new Date("2026-07-01T10:00:00.000Z")); + expect(aborted).toEqual({ id: "bulk_read" }); + expect(receivedRequests.map((request) => `${request.method} ${request.url}`)).toEqual([ + "GET /api/v1/bulk-actions/bulk_read", + "POST /api/v1/bulk-actions/bulk_read/abort", + ]); + }); + + it("lists bulk actions", async () => { + requestHandler = (_request, response) => + json(response, { + data: [bulkActionObject("bulk_listed", "COMPLETED")], + pagination: { next: "cursor_next" }, + }); + + const page = await withApiClient(() => runs.bulk.list({ limit: 1, before: "cursor_before" })); + + const url = new URL(receivedRequests[0]?.url ?? "", baseUrl); + expect(receivedRequests[0]?.method).toBe("GET"); + expect(url.pathname).toBe("/api/v1/bulk-actions"); + expect(url.searchParams.get("page[size]")).toBe("1"); + expect(url.searchParams.get("page[before]")).toBe("cursor_before"); + expect(page.data[0]?.id).toBe("bulk_listed"); + expect(page.pagination.next).toBe("cursor_next"); + }); + + it("polls until the bulk action finishes", async () => { + requestHandler = (_request, response) => { + const status = receivedRequests.length === 1 ? "PENDING" : "COMPLETED"; + return json(response, bulkActionObject("bulk_poll", status)); + }; + + const bulkAction = await withApiClient(() => + runs.bulk.poll("bulk_poll", { pollIntervalMs: 1 }) + ); + + expect(bulkAction.status).toBe("COMPLETED"); + expect(receivedRequests.map((request) => request.url)).toEqual([ + "/api/v1/bulk-actions/bulk_poll", + "/api/v1/bulk-actions/bulk_poll", + ]); + }); + + function withApiClient(fn: () => Promise) { + return apiClientManager.runWithConfig( + { baseURL: baseUrl, accessToken: "tr_test_key" }, + async () => fn() + ); + } +}); + +function json(response: ServerResponse, body: unknown, status = 200) { + response.writeHead(status, { "content-type": "application/json" }); + response.end(JSON.stringify(body)); +} + +function bulkActionObject(id: string, status: "PENDING" | "COMPLETED" | "ABORTED") { + return { + id, + type: "REPLAY", + status, + counts: { total: 2, success: status === "COMPLETED" ? 2 : 0, failure: 0 }, + createdAt: "2026-07-01T10:00:00.000Z", + completedAt: status === "COMPLETED" ? "2026-07-01T10:05:00.000Z" : undefined, + }; +} From 6cffef8ae0dfd66d0a7a7a0983197f7921bc34f1 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 09:30:53 +0100 Subject: [PATCH 05/19] improve bulk action service --- .../app/v3/services/bulk/BulkActionV2.server.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 7c1adfd3957..bcfd9e868ae 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -482,15 +482,15 @@ export class BulkActionService extends BaseService { }); if (!group) { - throw new ServiceValidationError("Bulk action not found", 404); + throw new ServiceValidationError(`Bulk action not found: ${friendlyId}`, 404); } if (group.status === BulkActionStatus.COMPLETED) { - throw new ServiceValidationError("Bulk action is already completed", 409); + throw new ServiceValidationError(`Bulk action group already completed: ${friendlyId}`, 409); } if (group.status === BulkActionStatus.ABORTED) { - throw new ServiceValidationError("Bulk action is already aborted", 409); + throw new ServiceValidationError(`Bulk action group already aborted: ${friendlyId}`, 409); } //ack the job (this doesn't guarantee it won't run again) @@ -508,9 +508,10 @@ export class BulkActionService extends BaseService { } export function freezeRunListFilters(filters: RunListInputFilters): RunListInputFilters { - const frozenFilters: RunListInputFilters = { ...filters }; - delete (frozenFilters as any).cursor; - delete (frozenFilters as any).direction; + const { cursor: _cursor, direction: _direction, ...frozenFilters } = filters as RunListInputFilters & { + cursor?: string; + direction?: "forward" | "backward"; + }; // Explicit run-id selections target specific, already-existing runs, so we // don't apply a time bound (which could otherwise exclude a selected run). From 5a198836bf03d3c8f35590a370a2f9a17e10ca21 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 11:44:42 +0100 Subject: [PATCH 06/19] improve api schema and types --- apps/webapp/app/routes/api.v1.bulk-actions.ts | 3 +- .../v3/services/bulk/BulkActionV2.server.ts | 17 ++++++----- .../core/src/v3/apiClient/bulkActions.test.ts | 6 ++-- packages/core/src/v3/apiClient/types.ts | 29 ++++++++++--------- packages/core/src/v3/schemas/api.ts | 26 ++++++++++++----- packages/trigger-sdk/src/v3/runs-bulk.test.ts | 6 ++-- 6 files changed, 48 insertions(+), 39 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts index 192bf1bd8f7..9918302c658 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -42,8 +42,7 @@ const { action } = createActionApiRoute( userId: authentication.actor?.sub ?? null, action: body.action, title: body.name, - region: body.region, - emailNotification: body.emailNotification, + region: body.targetRegion, filters: body.runIds ? { runId: body.runIds } : bulkActionFilterToRunListFilters(body.filter), diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index bcfd9e868ae..1e01a378686 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -50,6 +50,7 @@ export type ProcessToCompletionResult = { export class BulkActionService extends BaseService { public async create(input: CreateBulkActionInput) { + const { organizationId, projectId, environmentId, userId } = input; const filters = freezeRunListFilters(input.filters); // Region is a replay-only override that re-routes the replayed runs. It's @@ -61,7 +62,7 @@ export class BulkActionService extends BaseService { // region surfaces as a user-input (400) error rather than a 500. const [regionError] = await tryCatch( new WorkerGroupService({ prisma: this._prisma }).getDefaultWorkerGroupForProject({ - projectId: input.projectId, + projectId, regionOverride: replayRegion, }) ); @@ -78,7 +79,7 @@ export class BulkActionService extends BaseService { // Count the runs that will be affected by the bulk action const clickhouse = await clickhouseFactory.getClickhouseForOrganization( - input.organizationId, + organizationId, "standard" ); const runsRepository = new RunsRepository({ @@ -86,9 +87,9 @@ export class BulkActionService extends BaseService { prisma: this._replica as PrismaClient, }); const count = await runsRepository.countRuns({ - organizationId: input.organizationId, - projectId: input.projectId, - environmentId: input.environmentId, + organizationId, + projectId, + environmentId, ...filters, }); @@ -98,9 +99,9 @@ export class BulkActionService extends BaseService { data: { id, friendlyId, - projectId: input.projectId, - environmentId: input.environmentId, - userId: input.userId, + projectId, + environmentId, + userId, name: input.title, type: input.action === "cancel" ? BulkActionType.CANCEL : BulkActionType.REPLAY, params, diff --git a/packages/core/src/v3/apiClient/bulkActions.test.ts b/packages/core/src/v3/apiClient/bulkActions.test.ts index f764e53ad23..dc2207aca10 100644 --- a/packages/core/src/v3/apiClient/bulkActions.test.ts +++ b/packages/core/src/v3/apiClient/bulkActions.test.ts @@ -67,8 +67,7 @@ describe("ApiClient bulk actions", () => { action: "replay", filter: { status: ["FAILED"], taskIdentifier: "my-task" }, name: "Replay failures", - region: "eu_1", - emailNotification: true, + targetRegion: "eu_1", }); expect(result).toEqual({ id: "bulk_created" }); @@ -80,8 +79,7 @@ describe("ApiClient bulk actions", () => { action: "replay", filter: { status: ["FAILED"], taskIdentifier: "my-task" }, name: "Replay failures", - region: "eu_1", - emailNotification: true, + targetRegion: "eu_1", }); }); diff --git a/packages/core/src/v3/apiClient/types.ts b/packages/core/src/v3/apiClient/types.ts index f6efa6ece35..ee67515e5fb 100644 --- a/packages/core/src/v3/apiClient/types.ts +++ b/packages/core/src/v3/apiClient/types.ts @@ -70,32 +70,35 @@ export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQu env?: Array<"dev" | "staging" | "prod"> | "dev" | "staging" | "prod"; } +/** Same filters as runs.list(), excluding pagination. */ export type BulkActionFilter = Omit; export type BulkActionSelection = | { filter: BulkActionFilter; runIds?: never } | { runIds: string[]; filter?: never }; -export type CreateBulkActionOptions = BulkActionSelection & { - action: "cancel" | "replay"; +type BaseBulkActionOptions = BulkActionSelection & { name?: string; - /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ - region?: string; - emailNotification?: boolean; }; -export type CreateBulkCancelActionOptions = BulkActionSelection & { - name?: string; - emailNotification?: boolean; +type TargetRegionOption = { + /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ + targetRegion?: string; }; -export type CreateBulkReplayActionOptions = BulkActionSelection & { - name?: string; - /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ - region?: string; - emailNotification?: boolean; +export type CreateBulkActionOptions = + | (BaseBulkActionOptions & { + action: "cancel"; + targetRegion?: never; + }) + | (BaseBulkActionOptions & { action: "replay" } & TargetRegionOption); + +export type CreateBulkCancelActionOptions = BaseBulkActionOptions & { + targetRegion?: never; }; +export type CreateBulkReplayActionOptions = BaseBulkActionOptions & TargetRegionOption; + export type ListBulkActionsQueryParams = CursorPageParams; export interface SubscribeToRunsQueryParams { diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 0e0047c3ea1..c6bf6fc597c 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1246,15 +1246,25 @@ const BulkActionFilterRequestBody = z.object({ region: StringOrStringArray.optional(), }); +const BulkActionSelectionRequestBody = { + filter: BulkActionFilterRequestBody.optional(), + runIds: z.array(z.string()).min(1).optional(), + name: z.string().optional(), +}; + export const CreateBulkActionRequestBody = z - .object({ - action: z.enum(["cancel", "replay"]), - filter: BulkActionFilterRequestBody.optional(), - runIds: z.array(z.string()).min(1).optional(), - name: z.string().optional(), - region: z.string().optional(), - emailNotification: z.boolean().optional(), - }) + .discriminatedUnion("action", [ + z.object({ + action: z.literal("cancel"), + targetRegion: z.never().optional(), + ...BulkActionSelectionRequestBody, + }), + z.object({ + action: z.literal("replay"), + targetRegion: z.string().optional(), + ...BulkActionSelectionRequestBody, + }), + ]) .refine((body) => (body.filter ? 1 : 0) + (body.runIds ? 1 : 0) === 1, { message: "Exactly one of filter or runIds must be provided", }); diff --git a/packages/trigger-sdk/src/v3/runs-bulk.test.ts b/packages/trigger-sdk/src/v3/runs-bulk.test.ts index 0a28c9c1b7b..c31743eddc3 100644 --- a/packages/trigger-sdk/src/v3/runs-bulk.test.ts +++ b/packages/trigger-sdk/src/v3/runs-bulk.test.ts @@ -85,8 +85,7 @@ describe("runs.bulk", () => { runs.bulk.replay({ filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, name: "Replay failed tasks", - region: "eu_1", - emailNotification: true, + targetRegion: "eu_1", }) ); @@ -97,8 +96,7 @@ describe("runs.bulk", () => { action: "replay", filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, name: "Replay failed tasks", - region: "eu_1", - emailNotification: true, + targetRegion: "eu_1", }); }); From 2360008a36e48e6dc08540c6feb2ad93c9456400 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 11:50:19 +0100 Subject: [PATCH 07/19] add docs --- docs/docs.json | 1 + docs/management/runs/bulk-actions.mdx | 165 ++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 docs/management/runs/bulk-actions.mdx diff --git a/docs/docs.json b/docs/docs.json index f373503049c..0cb85c81380 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -333,6 +333,7 @@ "management/runs/retrieve", "management/runs/replay", "management/runs/cancel", + "management/runs/bulk-actions", "management/runs/reschedule", "management/runs/update-metadata", "management/runs/add-tags", diff --git a/docs/management/runs/bulk-actions.mdx b/docs/management/runs/bulk-actions.mdx new file mode 100644 index 00000000000..8bcd07a934b --- /dev/null +++ b/docs/management/runs/bulk-actions.mdx @@ -0,0 +1,165 @@ +--- +title: "Bulk actions" +description: "Cancel or replay many runs from the SDK using run IDs or the same filters as runs.list()." +--- + +**Bulk actions let you cancel or replay many runs asynchronously from the SDK by selecting runs with run IDs or `runs.list()` filters.** + +A bulk action returns a handle immediately. Use the handle to retrieve progress, poll until completion, list previous actions, or abort pending work. + +## Create a bulk replay + +Use `runs.bulk.replay()` to replay every run that matches a filter. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.replay({ + filter: { + status: "FAILED", + taskIdentifier: "sync-customer", + period: "24h", + }, + name: "Replay failed customer syncs", + targetRegion: "eu-central-1", +}); + +const completed = await runs.bulk.poll(action.id); +console.log(completed.status, completed.counts); +``` + +`filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. + + + Selects runs using the same filter shape as `runs.list()`, excluding `limit`, `after`, and `before`. + + + + Selects specific run IDs. Provide either `filter` or `runIds`, not both. + + + + A name for the bulk action. + + + + Replays matching runs in a specific region. When omitted, each replay keeps the original run's region. This option is only available for `runs.bulk.replay()`. + + +## Create a bulk cancel + +Use `runs.bulk.cancel()` to cancel every run that matches a filter, or specific run IDs. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.cancel({ + runIds: ["run_1234", "run_5678"], + name: "Cancel selected runs", +}); + +console.log(action.id); +``` + +Only runs that are still cancelable when the action reaches them are canceled. Runs that have already reached a final state count as failures in the bulk action summary. + +## Retrieve progress + +Use `runs.bulk.retrieve()` to read the current status and aggregate counts. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.retrieve("bulk_1234"); + +console.log(action.status); +console.log(action.counts.total, action.counts.success, action.counts.failure); +``` + +The returned bulk action object has these fields: + + + The bulk action ID, starting with `bulk_`. + + + + The action being performed. + + + + The current bulk action status. + + + + Aggregate processing counts. + + + + The number of runs selected when the bulk action was created. + + + The number of runs processed successfully. + + + The number of runs that could not be processed. + + + + + + The date and time the bulk action was created. + + + + The date and time the bulk action completed. + + +## Poll for completion + +Use `runs.bulk.poll()` to wait until the bulk action leaves the `PENDING` state. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const completed = await runs.bulk.poll("bulk_1234", { + pollIntervalMs: 2_000, +}); + +console.log(completed.status); +``` + +## Abort a bulk action + +Use `runs.bulk.abort()` to stop future batches from being processed. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +await runs.bulk.abort("bulk_1234"); +``` + +Abort is best effort. Runs already being processed in the current batch may still finish. + +## List bulk actions + +Use `runs.bulk.list()` to page through previous bulk actions in the current environment. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const page = await runs.bulk.list({ limit: 25 }); + +for (const action of page.data) { + console.log(action.id, action.status); +} +``` + +List results support the same auto-pagination helpers as other management API list methods: + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +for await (const action of runs.bulk.list({ limit: 25 })) { + console.log(action.id, action.status); +} +``` From 88295bde19abdc3f12a270f0431c702732f061f0 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 11:50:35 +0100 Subject: [PATCH 08/19] format --- apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 1e01a378686..51859eeab4b 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -509,7 +509,11 @@ export class BulkActionService extends BaseService { } export function freezeRunListFilters(filters: RunListInputFilters): RunListInputFilters { - const { cursor: _cursor, direction: _direction, ...frozenFilters } = filters as RunListInputFilters & { + const { + cursor: _cursor, + direction: _direction, + ...frozenFilters + } = filters as RunListInputFilters & { cursor?: string; direction?: "forward" | "backward"; }; From 06ba4fe34f7404348fb0ecb2d71b96fae604f918 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 12:01:06 +0100 Subject: [PATCH 09/19] format, lint --- apps/webapp/app/routes/api.v1.bulk-actions.ts | 2 +- packages/core/src/v3/apiClient/index.ts | 2 +- packages/trigger-sdk/src/v3/runs.ts | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts index 9918302c658..16b8b46acba 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3"; -import { z } from "zod"; +import type { z } from "zod"; import { ApiBulkActionListSearchParams, ApiBulkActionPresenter, diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index a0732cd1580..deda5d01f9a 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -120,7 +120,7 @@ import { runShapeStream, type SSEStreamPart, } from "./runStream.js"; -import { +import type { CreateBulkActionOptions, CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, diff --git a/packages/trigger-sdk/src/v3/runs.ts b/packages/trigger-sdk/src/v3/runs.ts index 4a2808b583a..3bd2a9ea7f8 100644 --- a/packages/trigger-sdk/src/v3/runs.ts +++ b/packages/trigger-sdk/src/v3/runs.ts @@ -19,8 +19,6 @@ import type { AsyncIterableStream, ApiPromise, RealtimeRunSkipColumns, -} from "@trigger.dev/core/v3"; -import { AbortBulkActionResponseBody, BulkActionObject, CanceledRunResponse, From 9f3389b519164e358d4ef89e078567a53723b173 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 12:20:41 +0100 Subject: [PATCH 10/19] fix e2e test --- apps/webapp/test/bulk-actions-api.e2e.full.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts index 0470172984f..a479131e1e0 100644 --- a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts +++ b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts @@ -148,8 +148,8 @@ describe("Bulk actions API", () => { expect(response.status).toBe(409); const body = await response.json(); - expect(body).toEqual({ error: "Bulk action is already completed" }); - expect(JSON.stringify(body)).not.toContain(bulkAction.friendlyId); + expect(body.error).toEqual(expect.any(String)); + expect(body.error).toContain(bulkAction.friendlyId); }); it("rejects create requests with both filter and runIds", async () => { From b6a957b1f7e75d974548d3526d6515efda5ac619 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 15:06:56 +0100 Subject: [PATCH 11/19] disallow empty filter --- .../test/bulk-actions-api.e2e.full.test.ts | 23 +++++++- docs/management/runs/bulk-actions.mdx | 2 +- packages/core/src/v3/apiClient/types.ts | 7 ++- packages/core/src/v3/schemas/api.ts | 53 +++++++++++++------ 4 files changed, 65 insertions(+), 20 deletions(-) diff --git a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts index a479131e1e0..43be105962a 100644 --- a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts +++ b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts @@ -159,7 +159,7 @@ describe("Bulk actions API", () => { const response = await server.webapp.fetch("/api/v1/bulk-actions", { method: "POST", headers: authHeaders(apiKey), - body: JSON.stringify({ action: "cancel", filter: {}, runIds: ["run_123"] }), + body: JSON.stringify({ action: "cancel", filter: { status: "FAILED" }, runIds: ["run_123"] }), }); expect(response.status).toBe(400); @@ -167,6 +167,21 @@ describe("Bulk actions API", () => { expect(body.error).toContain("Exactly one of filter or runIds must be provided"); }); + it("rejects create requests with an empty filter", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", filter: {} }), + }); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain("At least one filter must be provided"); + }); + it("returns a generic error for unexpected create failures", async () => { const server = getTestServer(); const { apiKey } = await seedTestEnvironment(server.prisma); @@ -174,7 +189,11 @@ describe("Bulk actions API", () => { const response = await server.webapp.fetch("/api/v1/bulk-actions", { method: "POST", headers: authHeaders(apiKey), - body: JSON.stringify({ action: "cancel", filter: {}, name: "No ClickHouse in this suite" }), + body: JSON.stringify({ + action: "cancel", + filter: { status: "FAILED" }, + name: "No ClickHouse in this suite", + }), }); expect(response.status).toBe(500); diff --git a/docs/management/runs/bulk-actions.mdx b/docs/management/runs/bulk-actions.mdx index 8bcd07a934b..af9118ce8b6 100644 --- a/docs/management/runs/bulk-actions.mdx +++ b/docs/management/runs/bulk-actions.mdx @@ -28,7 +28,7 @@ const completed = await runs.bulk.poll(action.id); console.log(completed.status, completed.counts); ``` -`filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. +`filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Provide at least one filter field; use `runIds` when you want to target specific runs. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. Selects runs using the same filter shape as `runs.list()`, excluding `limit`, `after`, and `before`. diff --git a/packages/core/src/v3/apiClient/types.ts b/packages/core/src/v3/apiClient/types.ts index ee67515e5fb..0684118ff28 100644 --- a/packages/core/src/v3/apiClient/types.ts +++ b/packages/core/src/v3/apiClient/types.ts @@ -70,8 +70,13 @@ export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQu env?: Array<"dev" | "staging" | "prod"> | "dev" | "staging" | "prod"; } +type RequireAtLeastOne = T & + { + [K in Keys]-?: Required>; + }[Keys]; + /** Same filters as runs.list(), excluding pagination. */ -export type BulkActionFilter = Omit; +export type BulkActionFilter = RequireAtLeastOne>; export type BulkActionSelection = | { filter: BulkActionFilter; runIds?: never } diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index c6bf6fc597c..528d899ea58 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1229,22 +1229,43 @@ const MachineOrMachineArray = z.union([MachinePresetName, z.array(MachinePresetN const QueueOrQueueArray = z.union([QueueTypeName, z.array(QueueTypeName)]); const DateOrNumber = z.union([z.coerce.date(), z.number()]); -const BulkActionFilterRequestBody = z.object({ - status: z.union([RunStatus, z.array(RunStatus)]).optional(), - taskIdentifier: StringOrStringArray.optional(), - version: StringOrStringArray.optional(), - from: DateOrNumber.optional(), - to: DateOrNumber.optional(), - period: z.string().optional(), - bulkAction: z.string().optional(), - tag: StringOrStringArray.optional(), - schedule: z.string().optional(), - isTest: z.boolean().optional(), - batch: z.string().optional(), - queue: QueueOrQueueArray.optional(), - machine: MachineOrMachineArray.optional(), - region: StringOrStringArray.optional(), -}); +const BulkActionFilterRequestBody = z + .object({ + status: z.union([RunStatus, z.array(RunStatus)]).optional(), + taskIdentifier: StringOrStringArray.optional(), + version: StringOrStringArray.optional(), + from: DateOrNumber.optional(), + to: DateOrNumber.optional(), + period: z.string().optional(), + bulkAction: z.string().optional(), + tag: StringOrStringArray.optional(), + schedule: z.string().optional(), + isTest: z.boolean().optional(), + batch: z.string().optional(), + queue: QueueOrQueueArray.optional(), + machine: MachineOrMachineArray.optional(), + region: StringOrStringArray.optional(), + }) + .refine((filter) => Object.values(filter).some(isNonEmptyBulkActionFilterValue), { + message: "At least one filter must be provided", + }); + +/** Recursively checks for at least one non-undefined, non-empty value. */ +function isNonEmptyBulkActionFilterValue(value: unknown): boolean { + if (value === undefined) { + return false; + } + + if (Array.isArray(value)) { + return value.some(isNonEmptyBulkActionFilterValue); + } + + if (typeof value === "string") { + return value.trim().length > 0; + } + + return true; +} const BulkActionSelectionRequestBody = { filter: BulkActionFilterRequestBody.optional(), From 14334c6b6347bebb84d75a6e1095e0d4fdf100bb Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 16:06:47 +0100 Subject: [PATCH 12/19] simplify route reads --- .../v3/ApiBulkActionPresenter.server.ts | 19 +----------------- ...api.v1.bulk-actions.$bulkActionId.abort.ts | 3 ++- .../api.v1.bulk-actions.$bulkActionId.ts | 20 ++++++++----------- 3 files changed, 11 insertions(+), 31 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts index 24820d3261c..5e329bee404 100644 --- a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts @@ -37,23 +37,6 @@ type BulkActionRow = Pick< >; export class ApiBulkActionPresenter extends BasePresenter { - public async retrieve(environmentId: string, bulkActionId: string) { - // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. - const bulkAction = await this._prisma.bulkActionGroup.findFirst({ - select: bulkActionSelect, - where: { - environmentId, - friendlyId: bulkActionId, - }, - }); - - if (!bulkAction) { - return undefined; - } - - return apiBulkActionObject(bulkAction); - } - public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) { const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE; const after = searchParams["page[after]"]; @@ -116,7 +99,7 @@ export class ApiBulkActionPresenter extends BasePresenter { } } -const bulkActionSelect = { +export const bulkActionSelect = { id: true, friendlyId: true, name: true, diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts index 818a867cb0b..a672faf9e41 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -18,8 +18,9 @@ const { action } = createActionApiRoute( action: "write", resource: () => ({ type: "runs" }), }, + // Existence/auth gate. Reads from primary so create -> abort doesn't 404 on + // replica lag; the abort write path re-reads and mutates on primary. findResource: async (params, auth) => { - // Read from primary so create -> abort doesn't 404 on replica lag. return prisma.bulkActionGroup.findFirst({ select: { id: true }, where: { diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts index 6c9aa45196c..cfbf50e052c 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts @@ -1,7 +1,10 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; import { prisma } from "~/db.server"; -import { ApiBulkActionPresenter } from "~/presenters/v3/ApiBulkActionPresenter.server"; +import { + apiBulkActionObject, + bulkActionSelect, +} from "~/presenters/v3/ApiBulkActionPresenter.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; const ParamsSchema = z.object({ @@ -16,10 +19,10 @@ export const loader = createLoaderApiRoute( action: "read", resource: () => ({ type: "runs" }), }, + // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. findResource: async (params, auth) => { - // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. return prisma.bulkActionGroup.findFirst({ - select: { id: true }, + select: bulkActionSelect, where: { friendlyId: params.bulkActionId, environmentId: auth.environment.id, @@ -27,14 +30,7 @@ export const loader = createLoaderApiRoute( }); }, }, - async ({ params, authentication }) => { - const presenter = new ApiBulkActionPresenter(); - const bulkAction = await presenter.retrieve(authentication.environment.id, params.bulkActionId); - - if (!bulkAction) { - return json({ error: "Bulk action not found" }, { status: 404 }); - } - - return json(bulkAction); + async ({ resource }) => { + return json(apiBulkActionObject(resource)); } ); From 97d44a387d06695d344d3b5850f770300e852041 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Thu, 2 Jul 2026 16:54:12 +0100 Subject: [PATCH 13/19] move reads to replica --- .../app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts | 6 ++---- apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts | 5 ++--- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts index a672faf9e41..d5bdc0a8d0e 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { prisma } from "~/db.server"; +import { $replica } from "~/db.server"; import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; @@ -18,10 +18,8 @@ const { action } = createActionApiRoute( action: "write", resource: () => ({ type: "runs" }), }, - // Existence/auth gate. Reads from primary so create -> abort doesn't 404 on - // replica lag; the abort write path re-reads and mutates on primary. findResource: async (params, auth) => { - return prisma.bulkActionGroup.findFirst({ + return $replica.bulkActionGroup.findFirst({ select: { id: true }, where: { friendlyId: params.bulkActionId, diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts index cfbf50e052c..0f2dabad92f 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts @@ -1,6 +1,6 @@ import { json } from "@remix-run/server-runtime"; import { z } from "zod"; -import { prisma } from "~/db.server"; +import { $replica } from "~/db.server"; import { apiBulkActionObject, bulkActionSelect, @@ -19,9 +19,8 @@ export const loader = createLoaderApiRoute( action: "read", resource: () => ({ type: "runs" }), }, - // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. findResource: async (params, auth) => { - return prisma.bulkActionGroup.findFirst({ + return $replica.bulkActionGroup.findFirst({ select: bulkActionSelect, where: { friendlyId: params.bulkActionId, From 76b1bc37b88ecd3179c69206b034130e9fcbddd4 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 09:31:16 +0100 Subject: [PATCH 14/19] add max concurrency to bulk replays --- apps/webapp/app/env.server.ts | 2 ++ .../v3/services/bulk/BulkActionV2.server.ts | 21 +++++++++++++++++++ .../migration.sql | 5 +++++ .../database/prisma/schema.prisma | 5 +++++ 4 files changed, 33 insertions(+) create mode 100644 internal-packages/database/prisma/migrations/20260702120000_bulk_action_group_pending_replay_index/migration.sql diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 367e9a3362d..2713833b22c 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1942,6 +1942,8 @@ const EnvironmentSchema = z BULK_ACTION_BATCH_SIZE: z.coerce.number().int().default(100), BULK_ACTION_BATCH_DELAY_MS: z.coerce.number().int().default(200), BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5), + /// Max number of concurrent in-flight (PENDING) bulk replays per environment. + BULK_ACTION_MAX_CONCURRENT_REPLAYS: z.coerce.number().int().default(3), // AI Run Filter AI_RUN_FILTER_MODEL: z.string().optional(), diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 51859eeab4b..372427b27f1 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -53,6 +53,27 @@ export class BulkActionService extends BaseService { const { organizationId, projectId, environmentId, userId } = input; const filters = freezeRunListFilters(input.filters); + // Concurrency guard for replays + // The count is backed by the (environmentId, status, type) index, so it only + // touches this env's PENDING replays and stays cheap. + if (input.action === "replay") { + const maxConcurrentReplays = env.BULK_ACTION_MAX_CONCURRENT_REPLAYS; + const inFlightReplays = await this._replica.bulkActionGroup.count({ + where: { + environmentId, + type: BulkActionType.REPLAY, + status: BulkActionStatus.PENDING, + }, + }); + + if (inFlightReplays >= maxConcurrentReplays) { + throw new ServiceValidationError( + `You can only run ${maxConcurrentReplays} bulk replays at a time in this environment. Wait for an in-progress replay to finish before starting another.`, + 429 + ); + } + } + // Region is a replay-only override that re-routes the replayed runs. It's // stored alongside the run-list filters under a dedicated key so it isn't // mistaken for a `regions` selection filter when the params are parsed. diff --git a/internal-packages/database/prisma/migrations/20260702120000_bulk_action_group_pending_replay_index/migration.sql b/internal-packages/database/prisma/migrations/20260702120000_bulk_action_group_pending_replay_index/migration.sql new file mode 100644 index 00000000000..f8b28b38d68 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260702120000_bulk_action_group_pending_replay_index/migration.sql @@ -0,0 +1,5 @@ +-- Backs the per-environment concurrent-replay limit: count of PENDING REPLAY groups. +-- Not partial (e.g. WHERE status = 'PENDING' AND type = 'REPLAY') as wouldn't be used +-- with the bind params from prisma. +CREATE INDEX CONCURRENTLY IF NOT EXISTS "BulkActionGroup_environmentId_status_type_idx" +ON "BulkActionGroup" ("environmentId", "status", "type"); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 7cf441eb9f8..226eba1151b 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2530,6 +2530,11 @@ model BulkActionGroup { // the INCLUDE/fillfactor indexes elsewhere in this schema). `migrate dev` will report // drift here; do NOT accept its drop/recreate — keep the hand-written migration. @@index([environmentId, type, dedupeKey]) + // Backs the per-environment concurrent-replay limit (count of PENDING REPLAY groups). + // Plain composite (not partial) so Prisma's parameterized count reliably uses it; the + // migration creates it CONCURRENTLY to avoid locking writes. See migration + // 20260702120000_bulk_action_group_pending_replay_index. + @@index([environmentId, status, type]) } enum BulkActionType { From 6baa02e2654521cd72425b0b0b2a3b8e25cc8610 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 10:37:48 +0100 Subject: [PATCH 15/19] add x-should-retry:false and update docs --- apps/webapp/app/routes/api.v1.bulk-actions.ts | 13 ++++++++++++- docs/management/runs/bulk-actions.mdx | 4 ++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts index 16b8b46acba..adf3d392183 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -52,7 +52,18 @@ const { action } = createActionApiRoute( return json({ id: result.bulkActionId }, { status: 202 }); } catch (error) { if (error instanceof ServiceValidationError) { - return json({ error: error.message }, { status: error.status ?? 400 }); + const status = error.status ?? 400; + return json( + { error: error.message }, + { + status, + // The SDK auto-retries 429s. The concurrent-replay cap is a semantic + // limit, not a transient rate limit, so it won't clear within the + // retry window. Tell the client not to retry so the error (and its + // actionable message) surfaces immediately instead of after backoff. + headers: status === 429 ? { "x-should-retry": "false" } : undefined, + } + ); } logger.error("Failed to create API bulk action", { error }); diff --git a/docs/management/runs/bulk-actions.mdx b/docs/management/runs/bulk-actions.mdx index af9118ce8b6..a2efef6ef4a 100644 --- a/docs/management/runs/bulk-actions.mdx +++ b/docs/management/runs/bulk-actions.mdx @@ -30,6 +30,10 @@ console.log(completed.status, completed.counts); `filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Provide at least one filter field; use `runIds` when you want to target specific runs. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. + + Each environment can only run a limited number of bulk replays at the same time. If you start a replay while too many are still in progress, the call fails and returns an error. Wait for an in-progress replay to finish (or [abort](#abort-a-bulk-action) one) before starting another. Bulk cancels are not subject to this limit. + + Selects runs using the same filter shape as `runs.list()`, excluding `limit`, `after`, and `before`. From 6e45c3ea22ce20d124aecfa795f2086817ff5c3f Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 12:13:12 +0100 Subject: [PATCH 16/19] max run ids and max name length --- apps/webapp/app/env.server.ts | 2 ++ apps/webapp/app/routes/api.v1.bulk-actions.ts | 10 ++++++++++ packages/core/src/v3/schemas/api.ts | 2 +- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 2713833b22c..6977fd30bbb 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1944,6 +1944,8 @@ const EnvironmentSchema = z BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5), /// Max number of concurrent in-flight (PENDING) bulk replays per environment. BULK_ACTION_MAX_CONCURRENT_REPLAYS: z.coerce.number().int().default(3), + /// Max number of explicit run IDs accepted in a single bulk action create request. + BULK_ACTION_MAX_RUN_IDS: z.coerce.number().int().default(500), // AI Run Filter AI_RUN_FILTER_MODEL: z.string().optional(), diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts index adf3d392183..60a6a57ccb8 100644 --- a/apps/webapp/app/routes/api.v1.bulk-actions.ts +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -1,6 +1,7 @@ import { json } from "@remix-run/server-runtime"; import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3"; import type { z } from "zod"; +import { env } from "~/env.server"; import { ApiBulkActionListSearchParams, ApiBulkActionPresenter, @@ -32,6 +33,15 @@ const { action } = createActionApiRoute( return json({ error: "Invalid request body" }, { status: 400 }); } + if (body.runIds && body.runIds.length > env.BULK_ACTION_MAX_RUN_IDS) { + return json( + { + error: `Too many runIds (${body.runIds.length}). Maximum is ${env.BULK_ACTION_MAX_RUN_IDS}. Use a filter to select more runs.`, + }, + { status: 400 } + ); + } + const service = new BulkActionService(); try { diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 528d899ea58..38f17daf6fe 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1270,7 +1270,7 @@ function isNonEmptyBulkActionFilterValue(value: unknown): boolean { const BulkActionSelectionRequestBody = { filter: BulkActionFilterRequestBody.optional(), runIds: z.array(z.string()).min(1).optional(), - name: z.string().optional(), + name: z.string().max(255, "Name must be less than 255 characters").optional(), }; export const CreateBulkActionRequestBody = z From b3212f309625a0a629112607829ef2038dcf6b60 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 14:55:31 +0100 Subject: [PATCH 17/19] only count recent bulk actions in concurrency check --- .../v3/services/bulk/BulkActionV2.server.ts | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 372427b27f1..c2468357a9d 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -48,14 +48,26 @@ export type ProcessToCompletionResult = { completed: boolean; }; +// How recently a PENDING replay must have made progress to still count against +// the per-environment concurrency limit. Every processed batch bumps the +// group's `updatedAt`, so a live replay keeps a fresh heartbeat for its whole +// life no matter how long it runs, while a replay whose job has exhausted its +// retries (and stopped making progress) ages out and frees its slot. This is +// wide enough to cover the worst-case gap between batches for a healthy replay +// that is retrying. +const REPLAY_INFLIGHT_WINDOW_MS = 30 * 60 * 1000; + export class BulkActionService extends BaseService { public async create(input: CreateBulkActionInput) { const { organizationId, projectId, environmentId, userId } = input; const filters = freezeRunListFilters(input.filters); - // Concurrency guard for replays - // The count is backed by the (environmentId, status, type) index, so it only - // touches this env's PENDING replays and stays cheap. + // Concurrency guard for replays. + // The seek is backed by the (environmentId, status, type) index; the + // `updatedAt` window is applied on top so we only count replays that are + // actually still making progress. A replay whose job has died stops bumping + // `updatedAt` and drops out of the count, so it can't permanently hold a + // slot. Aborting a replay (dashboard or API) clears its slot immediately. if (input.action === "replay") { const maxConcurrentReplays = env.BULK_ACTION_MAX_CONCURRENT_REPLAYS; const inFlightReplays = await this._replica.bulkActionGroup.count({ @@ -63,6 +75,7 @@ export class BulkActionService extends BaseService { environmentId, type: BulkActionType.REPLAY, status: BulkActionStatus.PENDING, + updatedAt: { gte: new Date(Date.now() - REPLAY_INFLIGHT_WINDOW_MS) }, }, }); From 2a4a56b3399c8d4de203c2fbe96022fc35cd7c66 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 15:07:08 +0100 Subject: [PATCH 18/19] add some tests for new changes --- .../test/bulk-actions-api.e2e.full.test.ts | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts index 43be105962a..6c6777db62e 100644 --- a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts +++ b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts @@ -199,6 +199,79 @@ describe("Bulk actions API", () => { expect(response.status).toBe(500); await expect(response.json()).resolves.toEqual({ error: "Failed to create bulk action" }); }); + + it("blocks a new replay once the concurrent-replay limit is reached", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + + // Fill the per-environment concurrent-replay slots with fresh, in-flight replays. + // The guard runs before the ClickHouse count, so this asserts cleanly without it. + for (let i = 0; i < 3; i++) { + await seedBulkAction(server.prisma, project, environment, { + type: BulkActionType.REPLAY, + status: BulkActionStatus.PENDING, + }); + } + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "replay", filter: { status: "FAILED" } }), + }); + + expect(response.status).toBe(429); + // The cap is a semantic limit, not a transient rate limit, so the SDK must not retry it. + expect(response.headers.get("x-should-retry")).toBe("false"); + const body = await response.json(); + expect(body.error).toContain("bulk replays at a time"); + }); + + it("does not count stale replays that have stopped making progress", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + + for (let i = 0; i < 3; i++) { + await seedBulkAction(server.prisma, project, environment, { + type: BulkActionType.REPLAY, + status: BulkActionStatus.PENDING, + }); + } + + // Backdate updatedAt past the in-flight window so these look like dead replays. + // (updatedAt is @updatedAt, so it can only be set via raw SQL, not on create.) + await server.prisma.$executeRawUnsafe( + `UPDATE "BulkActionGroup" SET "updatedAt" = now() - interval '31 minutes' WHERE "environmentId" = $1`, + environment.id + ); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "replay", filter: { status: "FAILED" } }), + }); + + // Stale replays don't hold a slot, so the guard lets the request through and it + // reaches the count step, which fails (no ClickHouse in this suite) with a 500 rather + // than being blocked by the concurrency guard's 429. + expect(response.status).toBe(500); + }); + + it("rejects create requests with more runIds than the allowed maximum", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const runIds = Array.from({ length: 501 }, (_, i) => `run_${i}`); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", runIds }), + }); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain("Too many runIds"); + }); }); function authHeaders(apiKey: string) { From 03b39646dd91b5573e52c7a039ce6103e32f7e98 Mon Sep 17 00:00:00 2001 From: Chris Arderne Date: Fri, 3 Jul 2026 15:26:55 +0100 Subject: [PATCH 19/19] update docs for review comment --- docs/management/runs/bulk-actions.mdx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/management/runs/bulk-actions.mdx b/docs/management/runs/bulk-actions.mdx index a2efef6ef4a..e326d7ddba6 100644 --- a/docs/management/runs/bulk-actions.mdx +++ b/docs/management/runs/bulk-actions.mdx @@ -30,6 +30,10 @@ console.log(completed.status, completed.counts); `filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Provide at least one filter field; use `runIds` when you want to target specific runs. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. + + Filters inherit the same time semantics as `runs.list()`: when you don't pass `from`, `to`, or `period`, the action defaults to the **last 7 days** and won't target older matching runs. To cover a wider range, pass an explicit `period` (such as `"30d"`), a `from` timestamp, or a `from`/`to` pair. This default only applies to `filter` selections; `runIds` selections are never time-bounded. + + Each environment can only run a limited number of bulk replays at the same time. If you start a replay while too many are still in progress, the call fails and returns an error. Wait for an in-progress replay to finish (or [abort](#abort-a-bulk-action) one) before starting another. Bulk cancels are not subject to this limit.