Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bulk-actions-sdk-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"@trigger.dev/sdk": patch
---

Add SDK and API client helpers for run bulk actions.
6 changes: 6 additions & 0 deletions .server-changes/bulk-actions-api-sdk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add API and SDK support for creating, listing, retrieving, polling, and aborting run bulk actions.
4 changes: 4 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,10 @@ const EnvironmentSchema = z
BULK_ACTION_BATCH_SIZE: z.coerce.number().int().default(100),
BULK_ACTION_BATCH_DELAY_MS: z.coerce.number().int().default(200),
BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5),
/// Max number of concurrent in-flight (PENDING) bulk replays per environment.
BULK_ACTION_MAX_CONCURRENT_REPLAYS: z.coerce.number().int().default(3),
/// Max number of explicit run IDs accepted in a single bulk action create request.
BULK_ACTION_MAX_RUN_IDS: z.coerce.number().int().default(500),

// AI Run Filter
AI_RUN_FILTER_MODEL: z.string().optional(),
Expand Down
155 changes: 155 additions & 0 deletions apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import {
type BulkActionGroup,
type BulkActionStatus,
type BulkActionType,
} from "@trigger.dev/database";
import { z } from "zod";
import { BasePresenter } from "./basePresenter.server";

const DEFAULT_PAGE_SIZE = 25;
const MAX_PAGE_SIZE = 100;

export const ApiBulkActionListSearchParams = z.object({
"page[size]": z.coerce.number().int().positive().min(1).max(MAX_PAGE_SIZE).optional(),
"page[after]": z.string().optional(),
"page[before]": z.string().optional(),
});

export type ApiBulkActionListSearchParams = z.infer<typeof ApiBulkActionListSearchParams>;

type BulkActionListCursor = {
createdAt: Date;
id: string;
};

type BulkActionRow = Pick<
BulkActionGroup,
| "id"
| "friendlyId"
| "name"
| "status"
| "type"
| "createdAt"
| "completedAt"
| "totalCount"
| "successCount"
| "failureCount"
>;

export class ApiBulkActionPresenter extends BasePresenter {
public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) {
const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE;
const after = searchParams["page[after]"];
const before = searchParams["page[before]"];

if (after && before) {
throw new Error("Only one of page[after] or page[before] can be provided");
}

const cursor = decodeCursor(after ?? before);
const direction = before ? "backward" : "forward";

const where = {
environmentId,
...(cursor
? direction === "forward"
? {
OR: [
{ createdAt: { lt: cursor.createdAt } },
{ createdAt: cursor.createdAt, id: { lt: cursor.id } },
],
}
: {
OR: [
{ createdAt: { gt: cursor.createdAt } },
{ createdAt: cursor.createdAt, id: { gt: cursor.id } },
],
}
: {}),
};

const rows = await this._replica.bulkActionGroup.findMany({
select: bulkActionSelect,
where,
orderBy:
direction === "forward"
? [{ createdAt: "desc" }, { id: "desc" }]
: [{ createdAt: "asc" }, { id: "asc" }],
take: pageSize + 1,
});

const hasMore = rows.length > pageSize;
const pageRows = rows.slice(0, pageSize);
const dataRows = direction === "forward" ? pageRows : [...pageRows].reverse();

const first = dataRows.at(0);
const last = dataRows.at(-1);

return {
data: dataRows.map(apiBulkActionObject),
pagination: {
next: last && (hasMore || direction === "backward") ? encodeCursor(last) : undefined,
previous:
first &&
((direction === "forward" && Boolean(after)) || (direction === "backward" && hasMore))
? encodeCursor(first)
: undefined,
},
};
}
}

export const bulkActionSelect = {
id: true,
friendlyId: true,
name: true,
status: true,
type: true,
createdAt: true,
completedAt: true,
totalCount: true,
successCount: true,
failureCount: true,
} as const;

export function apiBulkActionObject(row: BulkActionRow) {
return {
id: row.friendlyId,
name: row.name ?? undefined,
type: row.type as BulkActionType,
status: row.status as BulkActionStatus,
counts: {
total: row.totalCount,
success: row.successCount,
failure: row.failureCount,
},
createdAt: row.createdAt,
completedAt: row.completedAt ?? undefined,
};
}

function encodeCursor(row: Pick<BulkActionRow, "createdAt" | "id">) {
return Buffer.from(JSON.stringify({ createdAt: row.createdAt.getTime(), id: row.id })).toString(
"base64url"
);
}

function decodeCursor(cursor: string | undefined): BulkActionListCursor | undefined {
if (!cursor) {
return undefined;
}

try {
const parsed = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) as {
createdAt?: unknown;
id?: unknown;
};
if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") {
throw new Error("Invalid cursor");
}

return { createdAt: new Date(parsed.createdAt), id: parsed.id };
} catch {
throw new Error("Invalid cursor");
}
}
48 changes: 48 additions & 0 deletions apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server";
import { ServiceValidationError } from "~/v3/services/common.server";

const ParamsSchema = z.object({
bulkActionId: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
corsStrategy: "none",
authorization: {
action: "write",
resource: () => ({ type: "runs" }),
},
findResource: async (params, auth) => {
return $replica.bulkActionGroup.findFirst({
select: { id: true },
where: {
friendlyId: params.bulkActionId,
environmentId: auth.environment.id,
},
});
},
},
async ({ params, authentication }) => {
const service = new BulkActionService();

try {
const result = await service.abort(params.bulkActionId, authentication.environment.id);
return json({ id: result.bulkActionId });
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 400 });
}

logger.error("Failed to abort API bulk action", { error });
return json({ error: "Failed to abort bulk action" }, { status: 500 });
}
}
);

export { action };
35 changes: 35 additions & 0 deletions apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import {
apiBulkActionObject,
bulkActionSelect,
} from "~/presenters/v3/ApiBulkActionPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
bulkActionId: z.string(),
});

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
corsStrategy: "none",
authorization: {
action: "read",
resource: () => ({ type: "runs" }),
},
findResource: async (params, auth) => {
return $replica.bulkActionGroup.findFirst({
select: bulkActionSelect,
where: {
friendlyId: params.bulkActionId,
environmentId: auth.environment.id,
},
});
},
},
async ({ resource }) => {
return json(apiBulkActionObject(resource));
}
);
Loading