Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-ops-replication-fan-in.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Replicate task runs into ClickHouse from multiple source databases so the run-ops DB split can fan both databases into analytics, with an admin status endpoint reporting per-source replication leadership.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { prisma } from "~/db.server";
import { runStore } from "~/v3/runStore.server";
import { logger } from "~/services/logger.server";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
import { FINAL_RUN_STATUSES } from "~/v3/taskStatus";

Expand Down Expand Up @@ -40,11 +41,12 @@ export async function action({ request }: ActionFunctionArgs) {
runs.push(...batchRuns);
}

if (!runsReplicationInstance) {
const service = getRunsReplicationGlobal() ?? runsReplicationInstance;
if (!service) {
throw new Error("Runs replication instance not found");
}

await runsReplicationInstance.backfill(
await service.backfill(
runs.map((run) => ({
...run,
masterQueue: run.workerQueue,
Expand Down
60 changes: 60 additions & 0 deletions apps/webapp/app/routes/admin.api.v1.runs-replication.status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { type LoaderFunctionArgs, json } from "@remix-run/server-runtime";
import Redis from "ioredis";
import { env } from "~/env.server";
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
import { getRunsReplicationConfiguredSources } from "~/services/runsReplicationGlobal.server";

/**
* Probes per-source replication leadership via the redlock leader-lock key, which
* is DOUBLE-PREFIXED with `logical-replication-client:` — once from the connection's
* keyPrefix and once from redlock's resource string. So we prefix this connection
* with `runs-replication:logical-replication-client:` and EXISTS on the resource
* `logical-replication-client:runs-replication:<id>`, resolving to:
* runs-replication:logical-replication-client:logical-replication-client:runs-replication:<id>
*/
async function probeLeadership(sourceIds: string[]): Promise<Map<string, boolean>> {
const leaders = new Map<string, boolean>();

const redis = new Redis({
keyPrefix: "runs-replication:logical-replication-client:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
});

try {
for (const id of sourceIds) {
const exists = await redis.exists(`logical-replication-client:runs-replication:${id}`);
leaders.set(id, exists === 1);
}
} finally {
await redis.quit();
}

return leaders;
}

export async function loader({ request }: LoaderFunctionArgs) {
await requireAdminApiRequest(request);

const sources = getRunsReplicationConfiguredSources();

if (!sources || sources.length === 0) {
return json({ enabled: false, sources: [] });
}

const leaders = await probeLeadership(sources.map((s) => s.id));

return json({
enabled: env.RUN_REPLICATION_ENABLED === "1" && sources.length > 0,
sources: sources.map((s) => ({
id: s.id,
slotName: s.slotName,
originGeneration: s.originGeneration,
leader: leaders.get(s.id) ?? false,
})),
});
}
12 changes: 12 additions & 0 deletions apps/webapp/app/services/runsReplicationGlobal.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ import type { RunsReplicationService } from "./runsReplicationService.server";

const GLOBAL_RUNS_REPLICATION_KEY = Symbol.for("dev.trigger.ts.runs-replication");
const GLOBAL_TCP_MONITOR_KEY = Symbol.for("dev.trigger.ts.tcp-monitor");
const GLOBAL_RUNS_REPLICATION_SOURCES_KEY = Symbol.for("dev.trigger.ts.runs-replication-sources");

export type ConfiguredSource = { id: string; slotName: string; originGeneration: number };

type RunsReplicationGlobal = {
[GLOBAL_RUNS_REPLICATION_KEY]?: RunsReplicationService;
[GLOBAL_TCP_MONITOR_KEY]?: NodeJS.Timeout;
[GLOBAL_RUNS_REPLICATION_SOURCES_KEY]?: ConfiguredSource[];
};

const _globalThis = typeof globalThis === "object" ? globalThis : global;
Expand All @@ -23,6 +27,14 @@ export function unregisterRunsReplicationGlobal() {
delete _global[GLOBAL_RUNS_REPLICATION_KEY];
}

export function getRunsReplicationConfiguredSources(): ConfiguredSource[] | undefined {
return _global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY];
}

export function setRunsReplicationConfiguredSources(sources: ConfiguredSource[]) {
_global[GLOBAL_RUNS_REPLICATION_SOURCES_KEY] = sources;
}

export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined {
return _global[GLOBAL_TCP_MONITOR_KEY];
}
Expand Down
187 changes: 172 additions & 15 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,90 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
import { singleton } from "~/utils/singleton";
import { isSplitEnabled } from "~/v3/runOpsMigration/splitMode.server";
import { meter, provider } from "~/v3/tracer.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import {
setRunsReplicationConfiguredSources,
setRunsReplicationGlobal,
} from "./runsReplicationGlobal.server";
import {
RunsReplicationService,
type RunsReplicationSource,
} from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

export const runsReplicationInstance = singleton(
"runsReplicationInstance",
initializeRunsReplicationInstance
);

export function buildReplicationSources(args: {
splitEnabled: boolean;
legacyUrl: string;
newUrl?: string;
/** `false` forces the new source off under split; undefined follows split. */
newSourceOverride?: boolean;
legacySlotName: string;
legacyPublicationName: string;
legacyOriginGeneration: number;
newSlotName: string;
newPublicationName: string;
newOriginGeneration: number;
}): RunsReplicationSource[] {
const legacy: RunsReplicationSource = {
id: "legacy",
pgConnectionUrl: args.legacyUrl,
slotName: args.legacySlotName,
publicationName: args.legacyPublicationName,
originGeneration: args.legacyOriginGeneration,
};

const newSourceOn = args.splitEnabled && !!args.newUrl && args.newSourceOverride !== false;

if (!newSourceOn || !args.newUrl) {
return [legacy];
}

const next: RunsReplicationSource = {
id: "new",
pgConnectionUrl: args.newUrl,
slotName: args.newSlotName,
publicationName: args.newPublicationName,
originGeneration: args.newOriginGeneration,
};

return [legacy, next];
}

/**
* The residency-split gate and the `#new`->ClickHouse replication gate are
* independent env vars. If split is on (ksuid runs are minted on the new DB) but the
* constructed sources[] has no `"new"` source, every ksuid run is silently missing from
* ClickHouse — under-counting all CH-fronted usage/cost/metrics aggregates with no
* Postgres fallback. Couple the gates at boot: this misconfiguration must fail loudly
* rather than ship a fleet-wide under-count.
*/
export class SplitReplicationMisconfiguredError extends Error {
constructor() {
super(
'RUN_OPS_SPLIT_ENABLED is on but the runs-replication sources[] has no "new" source: ' +
"ksuid runs on the new DB would not replicate to ClickHouse, under-counting every " +
"ClickHouse-fronted aggregate. Enable the new replication source " +
"(RUN_REPLICATION_NEW_ENABLED / RUN_OPS_DATABASE_URL) or turn the split off."
);
this.name = "SplitReplicationMisconfiguredError";
}
}

export function assertReplicationCoversSplit(args: {
splitEnabled: boolean;
sources: RunsReplicationSource[];
}): void {
if (args.splitEnabled && !args.sources.some((s) => s.id === "new")) {
throw new SplitReplicationMisconfiguredError();
}
}

function initializeRunsReplicationInstance() {
const { DATABASE_URL } = process.env;
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
Expand All @@ -22,12 +97,11 @@ function initializeRunsReplicationInstance() {

console.log("🗃️ Runs replication service enabled");

const service = new RunsReplicationService({
// Shared options for both the legacy-only and the multi-source constructions.
// Excludes per-source identity (pgConnectionUrl/slotName/publicationName/sources).
const baseReplicationOptions = {
clickhouseFactory,
pgConnectionUrl: DATABASE_URL,
serviceName: "runs-replication",
slotName: env.RUN_REPLICATION_SLOT_NAME,
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
redisOptions: {
keyPrefix: "runs-replication:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
Expand Down Expand Up @@ -55,24 +129,107 @@ function initializeRunsReplicationInstance() {
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1",
disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1",
};

// Construct the SINGLE legacy source synchronously (the split gate has not resolved
// yet at module-init time, and singleton(...) memoizes this synchronous return value).
let service = new RunsReplicationService({
...baseReplicationOptions,
pgConnectionUrl: DATABASE_URL,
slotName: env.RUN_REPLICATION_SLOT_NAME,
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
// Explicit legacy source so the leader-lock key matches the id the status
// route probes from the registry below.
sources: [
{
id: "legacy",
pgConnectionUrl: DATABASE_URL,
slotName: env.RUN_REPLICATION_SLOT_NAME,
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
},
],
});

// Register the live handle so the status route + lifecycle routes can find it.
setRunsReplicationGlobal(service);
setRunsReplicationConfiguredSources([
{
id: "legacy",
slotName: env.RUN_REPLICATION_SLOT_NAME,
originGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
},
]);
Comment thread
d-cs marked this conversation as resolved.

if (env.RUN_REPLICATION_ENABLED === "1") {
clickhouseFactory
.isReady()
.then(() => service.start())
.then(() => {
console.log("🗃️ Runs replication service started");
// Construct-after-gate: resolve the async split gate ONCE at boot, and
// when both sources are enabled rebuild `service` with sources[] before starting.
// The legacy-only instance above is never started in the dual path (no slot/lock
// taken). runsReplicationService.server.ts is untouched. The create route also calls
// setRunsReplicationGlobal — last-writer-wins is the existing contract.
isSplitEnabled()
.then(async (splitEnabled) => {
const sources = buildReplicationSources({
splitEnabled,
legacyUrl: DATABASE_URL,
newUrl: env.RUN_OPS_DATABASE_URL ?? env.TASK_RUN_DATABASE_URL,
newSourceOverride: env.RUN_REPLICATION_NEW_ENABLED === "disabled" ? false : undefined,
Comment thread
d-cs marked this conversation as resolved.
legacySlotName: env.RUN_REPLICATION_SLOT_NAME,
legacyPublicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
legacyOriginGeneration: env.RUN_REPLICATION_LEGACY_ORIGIN_GENERATION,
newSlotName: env.RUN_REPLICATION_NEW_SLOT_NAME,
newPublicationName: env.RUN_REPLICATION_NEW_PUBLICATION_NAME,
newOriginGeneration: env.RUN_REPLICATION_NEW_ORIGIN_GENERATION,
});

// Refuse to start replication if split is on but `#new` is not a source.
assertReplicationCoversSplit({ splitEnabled, sources });

if (sources.length > 1) {
// Release the bootstrap instance's eager replication client (Redis + Redlock)
// before replacing it, or it leaks for the process lifetime. shutdown() is idempotent.
await service.shutdown();
// The scalar pgConnectionUrl/slotName/publicationName remain required on the
// options type, but are ignored when sources[] is non-empty — the
// service normalizes off sources. Pass the legacy scalars to satisfy the type.
service = new RunsReplicationService({
...baseReplicationOptions,
pgConnectionUrl: DATABASE_URL,
slotName: env.RUN_REPLICATION_SLOT_NAME,
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
sources,
});
Comment thread
d-cs marked this conversation as resolved.
setRunsReplicationGlobal(service);
setRunsReplicationConfiguredSources(
sources.map((s) => ({
id: s.id,
slotName: s.slotName,
originGeneration: s.originGeneration,
}))
);
}

return clickhouseFactory.isReady().then(() => service.start());
})
.then(() => console.log("🗃️ Runs replication service started"))
.catch((error) => {
console.error("🗃️ Runs replication service failed to start", {
error,
});
if (error instanceof SplitReplicationMisconfiguredError) {
// A silent ClickHouse under-count is worse than a crash — make it fatal.
console.error("🚨 FATAL: run-ops split / ClickHouse replication misconfiguration", {
error,
});
process.exit(1);
}
console.error("🗃️ Runs replication service failed to start", { error });
});

signalsEmitter.on("SIGTERM", service.shutdown.bind(service));
signalsEmitter.on("SIGINT", service.shutdown.bind(service));
// Closures over the `let` so SIGTERM/SIGINT hit whichever instance is live (NOT a
// stale .bind() to the discarded legacy-only instance).
signalsEmitter.on("SIGTERM", () => service.shutdown());
signalsEmitter.on("SIGINT", () => service.shutdown());
}

// Returns the legacy-only instance synchronously (singleton memoizes this). Lifecycle
// routes read getRunsReplicationGlobal() first, so they get the live multi-source one.
return service;
}
Loading