Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/redis-worker-oldest-message-age.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

Add a `redis_worker.queue.oldest_message_age` observable gauge (unit `ms`, labeled `worker_name`) reporting the age of the oldest overdue message in each queue. This is a generic queue-stall signal: it stays at 0 while a queue drains healthily and rises only when due work sits undrained (e.g. a blocked dequeue, a dead consumer, or backpressure), even when no items are being processed. Orphaned queue entries are resolved against the items hash so they don't report a phantom stall. Also exposes `SimpleQueue.oldestMessageAge()`.
68 changes: 68 additions & 0 deletions packages/redis-worker/src/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,74 @@ describe("SimpleQueue", () => {
}
});

redisTest("oldestMessageAge", { timeout: 20_000 }, async ({ redisContainer }) => {
const queue = new SimpleQueue({
name: "test-1",
schema: {
test: z.object({
value: z.number(),
}),
},
redisOptions: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
},
logger: new Logger("test", "log"),
});

try {
// empty queue → 0
expect(await queue.oldestMessageAge()).toBe(0);

// only a future-scheduled item → 0 (not yet overdue)
await queue.enqueue({
id: "future",
job: "test",
item: { value: 1 },
availableAt: new Date(Date.now() + 60_000),
visibilityTimeoutMs: 2000,
});
expect(await queue.oldestMessageAge()).toBe(0);

// an overdue item → age > 0
await queue.enqueue({
id: "overdue",
job: "test",
item: { value: 2 },
availableAt: new Date(Date.now() - 5_000),
visibilityTimeoutMs: 2000,
});
const age = await queue.oldestMessageAge();
expect(age).toBeGreaterThanOrEqual(5_000);
expect(age).toBeLessThan(60_000);

// an orphaned queue entry (no payload in the items hash), older than the
// real overdue item, must be ignored — it can't be dequeued, so it isn't a
// real stall. Age should still reflect the real overdue item (~5s), not the
// orphan's ~999s.
const redisClient = createRedisClient({
host: redisContainer.getHost(),
port: redisContainer.getPort(),
password: redisContainer.getPassword(),
});
await redisClient.zadd(`{queue:test-1:}queue`, Date.now() - 999_000, "orphaned-id");
const ageWithOrphan = await queue.oldestMessageAge();
expect(ageWithOrphan).toBeGreaterThanOrEqual(5_000);
expect(ageWithOrphan).toBeLessThan(60_000);

// once dequeued, the item is invisible (future-scored) → back to 0 (the
// orphan is cleaned by the dequeue scan, the real item goes in-flight)
const [first] = await queue.dequeue(2);
expect(first?.id).toBe("overdue");
expect(await queue.oldestMessageAge()).toBe(0);

await redisClient.quit();
} finally {
await queue.close();
}
});

redisTest("invisibility timeout", { timeout: 20_000 }, async ({ redisContainer }) => {
const queue = new SimpleQueue({
name: "test-1",
Expand Down
69 changes: 69 additions & 0 deletions packages/redis-worker/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,42 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
}
}

/**
* Age (in ms) of the oldest *overdue* message — the oldest item whose scheduled
* time has already passed (score <= now). Returns 0 when the queue is empty or
* only holds future/delayed or in-flight (future-scored) items.
*
* Resolves the candidate against the `items` hash so orphaned `queue` entries
* (a member whose payload is missing — the same stale state `dequeueItems`
* cleans up) don't report a phantom stall for work that can't be dequeued. The
* Lua scans due items oldest-first and returns the first score whose payload
* still exists.
*
* This is the generic stall signal: it stays at 0 while a queue drains healthily
* and rises only when due work sits undrained (poison block, dead consumer,
* backpressure).
*/
async oldestMessageAge(): Promise<number> {
try {
const now = Date.now();
// -1 sentinel = nothing due, or every due entry is orphaned.
const score = Number(await this.redis.getOldestDueScore(`queue`, `items`, now));

if (!Number.isFinite(score) || score < 0) {
return 0;
}

return Math.max(0, now - score);
} catch (e) {
this.logger.error(`SimpleQueue ${this.name}.oldestMessageAge(): error getting oldest age`, {
queue: this.name,
error: e,
});
// Swallow: a transient Redis error must not break observable metric collection.
return 0;
}
}

async getJob(id: string): Promise<QueueItem<TMessageCatalog> | null> {
const result = await this.redis.getJob(`queue`, `items`, id);

Expand Down Expand Up @@ -484,6 +520,30 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
`,
});

this.redis.defineCommand("getOldestDueScore", {
numberOfKeys: 2,
lua: `
local queue = KEYS[1]
local items = KEYS[2]
local now = tonumber(ARGV[1])

-- Oldest-first scan of due items, bounded so a long prefix of orphans can't
-- make this O(n). Orphans are rare (dequeueItems removes them), so in the
-- common case this returns on the first iteration. Read-only: unlike
-- dequeueItems we don't ZREM orphans here — a metric probe must not mutate.
local result = redis.call('ZRANGEBYSCORE', queue, '-inf', now, 'WITHSCORES', 'LIMIT', 0, 100)

for i = 1, #result, 2 do
local id = result[i]
if redis.call('HEXISTS', items, id) == 1 then
return result[i + 1]
end
end

return -1
`,
});

this.redis.defineCommand("getJob", {
numberOfKeys: 2,
lua: `
Expand Down Expand Up @@ -695,5 +755,14 @@ declare module "@internal/redis" {
id: string,
callback?: Callback<[string, string, string] | null>
): Result<[string, string, string] | null, Context>;

getOldestDueScore(
//keys
queue: string,
items: string,
//args
now: number,
callback?: Callback<string | number>
): Result<string | number, Context>;
}
}
19 changes: 19 additions & 0 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ class Worker<TCatalog extends WorkerCatalog> {
concurrencyLimitPendingObservableGauge.addCallback(
this.#updateConcurrencyLimitPendingMetric.bind(this)
);

const oldestMessageAgeObservableGauge = this.meter.createObservableGauge(
"redis_worker.queue.oldest_message_age",
{
description: "Age of the oldest overdue message in the queue",
unit: "ms",
valueType: ValueType.INT,
}
);

oldestMessageAgeObservableGauge.addCallback(this.#updateOldestMessageAgeMetric.bind(this));
}

async #updateQueueSizeMetric(observableResult: ObservableResult<Attributes>) {
Expand All @@ -223,6 +234,14 @@ class Worker<TCatalog extends WorkerCatalog> {
});
}

async #updateOldestMessageAgeMetric(observableResult: ObservableResult<Attributes>) {
const oldestMessageAge = await this.queue.oldestMessageAge();

observableResult.observe(oldestMessageAge, {
worker_name: this.options.name,
});
}

async #updateConcurrencyLimitActiveMetric(observableResult: ObservableResult<Attributes>) {
for (const [workerId, limiter] of Object.entries(this.limiters)) {
observableResult.observe(limiter.activeCount, {
Expand Down
Loading