Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal-packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export {
PAYLOAD_INDEX,
getTaskRunField,
getPayloadField,
composeTaskRunVersion,
} from "./taskRuns.js";

export { SESSION_COLUMNS, SESSION_INDEX, getSessionField } from "./sessions.js";
Expand Down
310 changes: 310 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { z } from "zod";
import { ClickhouseClient } from "./client/client.js";
import {
TASK_RUN_INDEX,
composeTaskRunVersion,
getChildRunStatusCounts,
getTaskRunsQueryBuilder,
insertRawTaskRunPayloadsCompactArrays,
Expand Down Expand Up @@ -887,4 +888,313 @@ describe("Task Runs V2", () => {
);
}
);

clickhouseTest(
"should collapse the same run from two producers to one latest-snapshot row",
async ({ clickhouseContainer }) => {
const client = new ClickhouseClient({
name: "test",
url: clickhouseContainer.getConnectionUrl(),
});
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });

const createdAt = new Date("2025-04-30 16:34:04.312").getTime();

const base: TaskRunInsertArray = [
"cm9kddfcs01zqdy88ld9mmrli",
"cm8zs78wb0002dy616dg75tv3",
"cm9kddfbz01zpdy88t9dstecu",
"cma45oli70002qrdy47w0j4n7",
createdAt,
createdAt,
"PENDING",
"PRODUCTION",
"run_cma45oli70002qrdy47w0j4n7",
1,
"V2",
"retry-task",
"task/retry-task",
"",
"",
null,
null,
null,
null,
createdAt,
null,
0,
0,
0,
{ data: null },
{ data: null },
"",
[],
"",
"",
"",
"",
"",
"",
0,
"span",
"trace",
"",
"",
"",
"",
true,
"1",
0,
"",
[],
"",
"",
"",
null,
"",
"",
"",
null,
];

const rdsSnapshot: TaskRunInsertArray = [...base];
rdsSnapshot[TASK_RUN_INDEX.status] = "PENDING";
rdsSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 0,
lsnVersion: 9_000_000_000n,
}).toString();

const psSnapshot: TaskRunInsertArray = [...base];
psSnapshot[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
psSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 1,
lsnVersion: 10n,
}).toString();

const [insertError] = await insert([rdsSnapshot, psSnapshot]);
expect(insertError).toBeNull();

const query = client.query({
name: "q",
query:
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
});
const [queryError, result] = await query({});
expect(queryError).toBeNull();
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: "cma45oli70002qrdy47w0j4n7",
status: "COMPLETED_SUCCESSFULLY",
})
);
}
);

clickhouseTest(
"should keep the latest intra-producer snapshot (same generation, ascending LSN)",
async ({ clickhouseContainer }) => {
const client = new ClickhouseClient({
name: "test",
url: clickhouseContainer.getConnectionUrl(),
});
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });

const createdAt = new Date("2025-04-30 16:34:04.312").getTime();

const base: TaskRunInsertArray = [
"cm9kddfcs01zqdy88ld9mmrli",
"cm8zs78wb0002dy616dg75tv3",
"cm9kddfbz01zpdy88t9dstecu",
"cma45oli70002qrdy47w0j4n7",
createdAt,
createdAt,
"PENDING",
"PRODUCTION",
"run_cma45oli70002qrdy47w0j4n7",
1,
"V2",
"retry-task",
"task/retry-task",
"",
"",
null,
null,
null,
null,
createdAt,
null,
0,
0,
0,
{ data: null },
{ data: null },
"",
[],
"",
"",
"",
"",
"",
"",
0,
"span",
"trace",
"",
"",
"",
"",
true,
"1",
0,
"",
[],
"",
"",
"",
null,
"",
"",
"",
null,
];

const earlier: TaskRunInsertArray = [...base];
earlier[TASK_RUN_INDEX.status] = "EXECUTING";
earlier[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 1,
lsnVersion: 10n,
}).toString();

const later: TaskRunInsertArray = [...base];
later[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
later[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 1,
lsnVersion: 20n,
}).toString();

const [insertError] = await insert([earlier, later]);
expect(insertError).toBeNull();

const query = client.query({
name: "q",
query:
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
});
const [queryError, result] = await query({});
expect(queryError).toBeNull();
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: "cma45oli70002qrdy47w0j4n7",
status: "COMPLETED_SUCCESSFULLY",
})
);
}
);

clickhouseTest(
"should collapse to the same winner regardless of insert order",
async ({ clickhouseContainer }) => {
const client = new ClickhouseClient({
name: "test",
url: clickhouseContainer.getConnectionUrl(),
});
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });

const createdAt = new Date("2025-04-30 16:34:04.312").getTime();

const base: TaskRunInsertArray = [
"cm9kddfcs01zqdy88ld9mmrli",
"cm8zs78wb0002dy616dg75tv3",
"cm9kddfbz01zpdy88t9dstecu",
"cma45oli70002qrdy47w0j4n7",
createdAt,
createdAt,
"PENDING",
"PRODUCTION",
"run_cma45oli70002qrdy47w0j4n7",
1,
"V2",
"retry-task",
"task/retry-task",
"",
"",
null,
null,
null,
null,
createdAt,
null,
0,
0,
0,
{ data: null },
{ data: null },
"",
[],
"",
"",
"",
"",
"",
"",
0,
"span",
"trace",
"",
"",
"",
"",
true,
"1",
0,
"",
[],
"",
"",
"",
null,
"",
"",
"",
null,
];

const rdsSnapshot: TaskRunInsertArray = [...base];
rdsSnapshot[TASK_RUN_INDEX.status] = "PENDING";
rdsSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 0,
lsnVersion: 9_000_000_000n,
}).toString();

const psSnapshot: TaskRunInsertArray = [...base];
psSnapshot[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
psSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
originGeneration: 1,
lsnVersion: 10n,
}).toString();

const [insertError] = await insert([psSnapshot, rdsSnapshot]);
expect(insertError).toBeNull();

const query = client.query({
name: "q",
query:
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
});
const [queryError, result] = await query({});
expect(queryError).toBeNull();
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: "cma45oli70002qrdy47w0j4n7",
status: "COMPLETED_SUCCESSFULLY",
})
);
}
);
});
26 changes: 26 additions & 0 deletions internal-packages/clickhouse/src/taskRuns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,32 @@ export function getTaskRunField<K extends TaskRunColumnName>(
return run[TASK_RUN_INDEX[field]] as TaskRunFieldTypes[K];
}

/**
* Compose a globally-comparable ReplacingMergeTree version for task_runs_v2
* when the same run can be replicated from more than one Postgres producer.
*
* Each producer has its own, mutually-incomparable LSN space, so the raw
* LSN-derived version cannot be compared across producers. We reserve the top
* 8 bits for an `originGeneration` epoch (monotonic across producers: the more
* authoritative / later-cutover producer gets the higher generation) and keep
* the producer's own LSN in the low 56 bits to preserve in-producer ordering.
*
* Self-host single-DB never calls this (one producer => generation is constant
* and the existing raw LSN path is sufficient); the split gate skips it.
*/
export function composeTaskRunVersion(opts: {
originGeneration: number;
lsnVersion: bigint;
}): bigint {
const gen = BigInt(opts.originGeneration);
if (gen < BigInt(0) || gen > BigInt(0xff)) {
throw new Error(`originGeneration out of range (0-255): ${opts.originGeneration}`);
}
const LSN_BITS = BigInt(56);
const LSN_MASK = (BigInt(1) << LSN_BITS) - BigInt(1); // low 56 bits
return (gen << LSN_BITS) | (opts.lsnVersion & LSN_MASK);
}

export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) {
return ch.insertCompactRaw({
name: "insertTaskRunsCompactArrays",
Expand Down
Loading
Loading