From 6aa895402ace1c7171c1584d888de89bd4feec97 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 25 Feb 2026 22:15:53 +0000 Subject: [PATCH 1/9] feat: two-level tenant dispatch for fair queue (TRI-7082) Replace flat master queue index with two-level tenant dispatch to fix noisy neighbor problem. When a tenant has many queues at capacity, the scheduler now iterates tenants (Level 1) not queues, then fetches per-tenant queues (Level 2) only for eligible tenants. Single-deploy migration: new enqueues write to dispatch indexes only, consumer drains old master queue alongside new dispatch path until empty. refs TRI-7082 --- packages/redis-worker/src/fair-queue/index.ts | 603 ++++++++++++++--- .../src/fair-queue/keyProducer.ts | 16 +- .../src/fair-queue/schedulers/drr.ts | 102 +++ .../redis-worker/src/fair-queue/telemetry.ts | 48 +- .../src/fair-queue/tenantDispatch.ts | 182 ++++++ .../fair-queue/tests/tenantDispatch.test.ts | 615 ++++++++++++++++++ packages/redis-worker/src/fair-queue/types.ts | 26 + 7 files changed, 1498 insertions(+), 94 deletions(-) create mode 100644 packages/redis-worker/src/fair-queue/tenantDispatch.ts create mode 100644 packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 59177d11673..4374699b01b 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -6,6 +6,7 @@ import { setInterval } from "node:timers/promises"; import { type z } from "zod"; import { ConcurrencyManager } from "./concurrency.js"; import { MasterQueue } from "./masterQueue.js"; +import { TenantDispatch } from "./tenantDispatch.js"; import { type RetryStrategy, ExponentialBackoffRetry } from "./retry.js"; import { isAbortError } from "../utils.js"; import { @@ -17,6 +18,7 @@ import { import type { ConcurrencyGroupConfig, DeadLetterMessage, + DispatchSchedulerContext, EnqueueBatchOptions, EnqueueOptions, FairQueueKeyProducer, @@ -25,8 +27,10 @@ import type { GlobalRateLimiter, QueueCooloffState, QueueDescriptor, + QueueWithScore, SchedulerContext, StoredMessage, + TenantQueues, } from "./types.js"; import { VisibilityManager } from "./visibility.js"; import { WorkerQueueManager } from "./workerQueue.js"; @@ -42,6 +46,7 @@ export * from "./scheduler.js"; export * from "./schedulers/index.js"; export * from "./retry.js"; export * from "./telemetry.js"; +export * from "./tenantDispatch.js"; /** * FairQueue is the main orchestrator for fair queue message routing. @@ -110,6 +115,10 @@ export class FairQueue { // Queue descriptor cache for message processing private queueDescriptorCache = new Map(); + // Two-level tenant dispatch + private tenantDispatch: TenantDispatch; + private legacyDrainComplete: boolean[]; + constructor(private options: FairQueueOptions) { this.redis = createRedisClient(options.redis); this.keys = options.keys; @@ -178,6 +187,15 @@ export class FairQueue { shardCount: this.shardCount, }); + this.tenantDispatch = new TenantDispatch({ + redis: options.redis, + keys: options.keys, + shardCount: this.shardCount, + }); + + // Track per-shard drain status of legacy master queue + this.legacyDrainComplete = new Array(this.shardCount).fill(false); + if (options.concurrencyGroups && options.concurrencyGroups.length > 0) { this.concurrencyManager = new ConcurrencyManager({ redis: options.redis, @@ -230,12 +248,18 @@ export class FairQueue { getMasterQueueLength: async (shardId: number) => { return await this.masterQueue.getShardQueueCount(shardId); }, + getDispatchLength: async (shardId: number) => { + return await this.tenantDispatch.getShardTenantCount(shardId); + }, getInflightCount: async (shardId: number) => { return await this.visibilityManager.getInflightCount(shardId); }, getDLQLength: async (tenantId: string) => { return await this.getDeadLetterQueueLength(tenantId); }, + isLegacyDrainComplete: (shardId: number) => { + return this.legacyDrainComplete[shardId] ?? false; + }, shardCount: this.shardCount, observedTenants: options?.observedTenants, }); @@ -257,7 +281,8 @@ export class FairQueue { const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); const shardId = this.masterQueue.getShardForQueue(options.queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); // Validate payload if schema provided and validation enabled if (this.validateOnEnqueue && this.payloadSchema) { @@ -297,15 +322,17 @@ export class FairQueue { metadata: options.metadata, }; - // Use atomic Lua script to enqueue and update master queue - await this.redis.enqueueMessageAtomic( + // Use atomic Lua script to enqueue and update tenant dispatch indexes + await this.redis.enqueueMessageAtomicV2( queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, options.queueId, messageId, timestamp.toString(), - JSON.stringify(storedMessage) + JSON.stringify(storedMessage), + options.tenantId ); span.setAttributes({ @@ -344,7 +371,8 @@ export class FairQueue { const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); const shardId = this.masterQueue.getShardForQueue(options.queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); const now = Date.now(); // Store queue descriptor @@ -397,12 +425,14 @@ export class FairQueue { args.push(messageId, timestamp.toString(), JSON.stringify(storedMessage)); } - // Use atomic Lua script for batch enqueue - await this.redis.enqueueBatchAtomic( + // Use atomic Lua script for batch enqueue with tenant dispatch indexes + await this.redis.enqueueBatchAtomicV2( queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, options.queueId, + options.tenantId, ...args ); @@ -672,6 +702,7 @@ export class FairQueue { await Promise.all([ this.masterQueue.close(), + this.tenantDispatch.close(), this.concurrencyManager?.close(), this.visibilityManager.close(), this.workerQueueManager.close(), @@ -696,7 +727,15 @@ export class FairQueue { * Get total queue count across all shards. */ async getTotalQueueCount(): Promise { - return await this.masterQueue.getTotalQueueCount(); + // Count from new dispatch index (primary) + legacy master queue (drain) + const [dispatchCount, legacyCount] = await Promise.all([ + this.tenantDispatch.getTotalTenantCount(), + this.masterQueue.getTotalQueueCount(), + ]); + // During migration, dispatch has tenant count (not queue count). + // For a more accurate queue count, we'd need to sum per-tenant queue counts. + // For now, return the dispatch tenant count + any legacy queues still draining. + return dispatchCount + legacyCount; } /** @@ -736,7 +775,7 @@ export class FairQueue { loopId, async (span) => { span.setAttribute("shard_id", shardId); - return await this.#processMasterQueueShard(loopId, shardId, span); + return await this.#processShardIteration(loopId, shardId, span); }, { iterationSpanName: "processMasterQueueShard", @@ -781,44 +820,207 @@ export class FairQueue { } } - async #processMasterQueueShard( + /** + * Process a shard iteration. Runs both the new tenant dispatch path + * and the legacy master queue drain path. + */ + async #processShardIteration( loopId: string, shardId: number, parentSpan?: Span ): Promise { - const masterQueueKey = this.keys.masterQueueKey(shardId); + let hadWork = false; + + // Main path: new two-level tenant dispatch (gets full DRR scheduling) + hadWork = await this.#processDispatchShard(loopId, shardId, parentSpan); + + // Drain path: legacy master queue (simple scheduling, no DRR) + if (!this.legacyDrainComplete[shardId]) { + const drainHadWork = await this.#drainLegacyMasterQueueShard(loopId, shardId, parentSpan); + if (!drainHadWork) { + // Check if the old master queue shard is completely empty + const count = await this.masterQueue.getShardQueueCount(shardId); + if (count === 0) { + this.legacyDrainComplete[shardId] = true; + this.logger.info("Legacy master queue shard fully drained", { shardId }); + } + } + hadWork = hadWork || drainHadWork; + } - // Get total queues in this master queue shard for observability - const masterQueueSize = await this.masterQueue.getShardQueueCount(shardId); - parentSpan?.setAttribute("master_queue_size", masterQueueSize); - this.batchedSpanManager.incrementStat(loopId, "master_queue_size_sum", masterQueueSize); + return hadWork; + } - // Create scheduler context - const schedulerContext = this.#createSchedulerContext(); + /** + * Main path: process queues using the two-level tenant dispatch index. + * Level 1: dispatch index → tenantIds. Level 2: per-tenant → queueIds. + */ + async #processDispatchShard( + loopId: string, + shardId: number, + parentSpan?: Span + ): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + + // Get dispatch index size for observability + const dispatchSize = await this.tenantDispatch.getShardTenantCount(shardId); + parentSpan?.setAttribute("dispatch_size", dispatchSize); + this.batchedSpanManager.incrementStat(loopId, "dispatch_size_sum", dispatchSize); + + // Create dispatch-aware scheduler context + const schedulerContext: DispatchSchedulerContext = { + ...this.#createSchedulerContext(), + getQueuesForTenant: async (tenantId: string) => { + return this.tenantDispatch.getQueuesForTenant(tenantId); + }, + }; // Get queues to process from scheduler - const tenantQueues = await this.telemetry.trace( - "selectQueues", - async (span) => { - span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); - span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); - span.setAttribute("master_queue_size", masterQueueSize); - const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); - span.setAttribute("tenant_count", result.length); - span.setAttribute( - "queue_count", - result.reduce((acc, t) => acc + t.queues.length, 0) - ); - return result; - }, - { kind: SpanKind.INTERNAL } - ); + let tenantQueues: TenantQueues[]; + + if (this.scheduler.selectQueuesFromDispatch) { + // Use dispatch-aware scheduler method (DRR with two-level lookup) + tenantQueues = await this.telemetry.trace( + "selectQueuesFromDispatch", + async (span) => { + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); + span.setAttribute("dispatch_size", dispatchSize); + const result = await this.scheduler.selectQueuesFromDispatch!( + dispatchKey, + loopId, + schedulerContext + ); + span.setAttribute("tenant_count", result.length); + span.setAttribute( + "queue_count", + result.reduce((acc, t) => acc + t.queues.length, 0) + ); + return result; + }, + { kind: SpanKind.INTERNAL } + ); + } else { + // Fallback: read dispatch index, build flat queue list, use legacy selectQueues + tenantQueues = await this.#fallbackDispatchToLegacyScheduler( + loopId, + shardId, + schedulerContext, + parentSpan + ); + } if (tenantQueues.length === 0) { this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); return false; } + return this.#processSelectedQueues(loopId, shardId, tenantQueues); + } + + /** + * Drain path: process remaining messages from the legacy master queue shard. + * Uses simple ZRANGEBYSCORE without DRR - just flushing pre-deploy messages. + */ + async #drainLegacyMasterQueueShard( + loopId: string, + shardId: number, + parentSpan?: Span + ): Promise { + const masterQueueKey = this.keys.masterQueueKey(shardId); + const now = Date.now(); + + // Simple fetch from old master queue - no DRR needed for drain + const results = await this.redis.zrangebyscore( + masterQueueKey, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + 100 + ); + + if (results.length === 0) { + return false; + } + + // Parse results into QueueWithScore, group by tenant + const byTenant = new Map(); + for (let i = 0; i < results.length; i += 2) { + const queueId = results[i]; + const _score = results[i + 1]; + if (queueId && _score) { + const tenantId = this.keys.extractTenantId(queueId); + const existing = byTenant.get(tenantId) ?? []; + existing.push(queueId); + byTenant.set(tenantId, existing); + } + } + + // Build TenantQueues, filter at-capacity tenants + const tenantQueues: TenantQueues[] = []; + for (const [tenantId, queueIds] of byTenant) { + if (this.concurrencyManager) { + const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); + if (atCapacity) continue; + } + tenantQueues.push({ tenantId, queues: queueIds }); + } + + if (tenantQueues.length === 0) { + return false; + } + + parentSpan?.setAttribute("drain_tenants", tenantQueues.length); + this.batchedSpanManager.incrementStat(loopId, "drain_tenants", tenantQueues.length); + + return this.#processSelectedQueues(loopId, shardId, tenantQueues); + } + + /** + * Fallback for schedulers that don't implement selectQueuesFromDispatch. + * Reads dispatch index, fetches per-tenant queues, flattens into the + * old-style master queue key format, and calls selectQueues. + */ + async #fallbackDispatchToLegacyScheduler( + loopId: string, + shardId: number, + context: DispatchSchedulerContext, + parentSpan?: Span + ): Promise { + // Get tenants from dispatch + const tenants = await this.tenantDispatch.getTenantsFromShard(shardId); + if (tenants.length === 0) return []; + + // For each tenant, get their queues and build a flat list + const allQueues: QueueWithScore[] = []; + for (const { tenantId } of tenants) { + const queues = await this.tenantDispatch.getQueuesForTenant(tenantId); + allQueues.push(...queues); + } + + if (allQueues.length === 0) return []; + + // Use the base scheduler context (without dispatch methods) + const baseContext = this.#createSchedulerContext(); + + // We need a temporary master queue key for the scheduler. Rather than + // writing to Redis, we'll group the queues by tenant ourselves and + // apply the same logic as the legacy selectQueues. + const masterQueueKey = this.keys.masterQueueKey(shardId); + return this.scheduler.selectQueues(masterQueueKey, loopId, baseContext); + } + + /** + * Shared claim loop: process selected queues from either dispatch or drain path. + * Claims messages and pushes to worker queues. + */ + async #processSelectedQueues( + loopId: string, + shardId: number, + tenantQueues: TenantQueues[] + ): Promise { // Track stats this.batchedSpanManager.incrementStat(loopId, "tenants_selected", tenantQueues.length); this.batchedSpanManager.incrementStat( @@ -829,7 +1031,6 @@ export class FairQueue { let messagesProcessed = 0; - // Process queues and push to worker queues for (const { tenantId, queues } of tenantQueues) { for (const queueId of queues) { // Check cooloff @@ -839,12 +1040,11 @@ export class FairQueue { } // Check tenant capacity before attempting to process - // If tenant is at capacity, skip ALL remaining queues for this tenant if (this.concurrencyManager) { const isAtCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); if (isAtCapacity) { this.batchedSpanManager.incrementStat(loopId, "tenant_capacity_skipped"); - break; // Skip remaining queues for this tenant + break; } } @@ -865,8 +1065,6 @@ export class FairQueue { messagesProcessed += processedFromQueue; this.batchedSpanManager.incrementStat(loopId, "messages_claimed", processedFromQueue); - // Record processed messages for DRR deficit tracking - // Use batch variant if available for efficiency, otherwise fall back to single calls if (this.scheduler.recordProcessedBatch) { await this.telemetry.trace( "recordProcessedBatch", @@ -892,16 +1090,11 @@ export class FairQueue { } } } else { - // Don't increment cooloff here - the queue was either: - // 1. Empty (removed from master, cache cleaned up) - // 2. Concurrency blocked (message released back to queue) - // Neither case warrants cooloff as they're not failures this.batchedSpanManager.incrementStat(loopId, "claim_skipped"); } } } - // Return true if we processed any messages (had work) return messagesProcessed > 0; } @@ -953,12 +1146,10 @@ export class FairQueue { >(queueId, queueKey, queueItemsKey, loopId, maxClaimCount, this.visibilityTimeoutMs); if (claimedMessages.length === 0) { - // Queue is empty, update master queue and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { - this.queueDescriptorCache.delete(queueId); - this.queueCooloffStates.delete(queueId); - } + // Queue is empty, update both old and new indexes and clean up caches + await this.#updateAllIndexesAfterDequeue(queueId, tenantId, shardId); + this.queueDescriptorCache.delete(queueId); + this.queueCooloffStates.delete(queueId); return 0; } @@ -1055,8 +1246,6 @@ export class FairQueue { */ async completeMessage(messageId: string, queueId: string): Promise { const shardId = this.masterQueue.getShardForQueue(queueId); - const queueKey = this.keys.queueKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message for concurrency release @@ -1086,9 +1275,13 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, messageId); } - // Update master queue if queue is now empty, and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { + // Update both old and new indexes, clean up caches if queue is empty + const { queueEmpty } = await this.#updateAllIndexesAfterDequeue( + queueId, + descriptor.tenantId, + shardId + ); + if (queueEmpty) { this.queueDescriptorCache.delete(queueId); this.queueCooloffStates.delete(queueId); } @@ -1134,7 +1327,7 @@ export class FairQueue { } : { id: queueId, tenantId: "", metadata: {} }; - // Release back to queue + // Release back to queue (visibility manager updates old master queue internally) await this.visibilityManager.release( messageId, queueId, @@ -1149,6 +1342,17 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, messageId); } + // Update new dispatch indexes (message is back in queue, update scores) + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.updateDispatchIndexes( + queueKey, + tenantQueueIndexKey, + dispatchKey, + queueId, + descriptor.tenantId + ); + this.logger.debug("Message released", { messageId, queueId, @@ -1200,6 +1404,7 @@ export class FairQueue { queueKey, queueItemsKey, masterQueueKey, + shardId, descriptor, error ); @@ -1215,6 +1420,7 @@ export class FairQueue { queueKey: string, queueItemsKey: string, masterQueueKey: string, + shardId: number, descriptor: QueueDescriptor, error?: Error ): Promise { @@ -1248,6 +1454,17 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, storedMessage.id); } + // Update dispatch indexes (message is back in queue with delay) + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.updateDispatchIndexes( + queueKey, + tenantQueueIndexKey, + dispatchKey, + queueId, + descriptor.tenantId + ); + this.telemetry.recordRetry(); this.logger.debug("Message scheduled for retry", { @@ -1351,26 +1568,55 @@ export class FairQueue { masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)), })); - // Release concurrency for all reclaimed messages in a single batch - // This is critical: when a message times out, its concurrency slot must be freed - // so the message can be processed again when it's re-claimed from the queue - if (this.concurrencyManager && reclaimedMessages.length > 0) { - try { - await this.concurrencyManager.releaseBatch( - reclaimedMessages.map((msg) => ({ - queue: { - id: msg.queueId, - tenantId: msg.tenantId, - metadata: msg.metadata ?? {}, - }, - messageId: msg.messageId, - })) - ); - } catch (error) { - this.logger.error("Failed to release concurrency for reclaimed messages", { - count: reclaimedMessages.length, - error: error instanceof Error ? error.message : String(error), - }); + if (reclaimedMessages.length > 0) { + // Release concurrency for all reclaimed messages in a single batch + // This is critical: when a message times out, its concurrency slot must be freed + // so the message can be processed again when it's re-claimed from the queue + if (this.concurrencyManager) { + try { + await this.concurrencyManager.releaseBatch( + reclaimedMessages.map((msg) => ({ + queue: { + id: msg.queueId, + tenantId: msg.tenantId, + metadata: msg.metadata ?? {}, + }, + messageId: msg.messageId, + })) + ); + } catch (error) { + this.logger.error("Failed to release concurrency for reclaimed messages", { + count: reclaimedMessages.length, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + // Update dispatch indexes for reclaimed queues (messages are back in queue) + const updatedQueues = new Set(); + for (const msg of reclaimedMessages) { + const key = `${msg.tenantId}:${msg.queueId}`; + if (updatedQueues.has(key)) continue; + updatedQueues.add(key); + + try { + const queueKey = this.keys.queueKey(msg.queueId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(msg.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.updateDispatchIndexes( + queueKey, + tenantQueueIndexKey, + dispatchKey, + msg.queueId, + msg.tenantId + ); + } catch (error) { + this.logger.error("Failed to update dispatch indexes for reclaimed message", { + queueId: msg.queueId, + tenantId: msg.tenantId, + error: error instanceof Error ? error.message : String(error), + }); + } } } @@ -1445,6 +1691,40 @@ export class FairQueue { // Private - Helpers // ============================================================================ + /** + * Update both old master queue and new dispatch indexes after a dequeue/complete. + * Both calls are idempotent - ZREM on a non-existent member is a no-op. + * This handles the transition period where queues may exist in either or both indexes. + */ + async #updateAllIndexesAfterDequeue( + queueId: string, + tenantId: string, + shardId: number + ): Promise<{ queueEmpty: boolean }> { + const queueKey = this.keys.queueKey(queueId); + const masterQueueKey = this.keys.masterQueueKey(shardId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); + + // Update legacy master queue (drain path, no-op if queue not there) + const removedFromMaster = await this.redis.updateMasterQueueIfEmpty( + masterQueueKey, + queueKey, + queueId + ); + + // Update new dispatch indexes + const removedFromDispatch = await this.redis.updateDispatchIndexes( + queueKey, + tenantQueueIndexKey, + dispatchKey, + queueId, + tenantId + ); + + return { queueEmpty: removedFromMaster === 1 || removedFromDispatch === 1 }; + } + #createSchedulerContext(): SchedulerContext { return { getCurrentConcurrency: async (groupName, groupId) => { @@ -1476,7 +1756,9 @@ export class FairQueue { // ============================================================================ #registerCommands(): void { - // Atomic single message enqueue with master queue update + // ---- Legacy Lua scripts (kept for drain of old master queue) ---- + + // Atomic single message enqueue with master queue update (legacy, used for drain only) this.redis.defineCommand("enqueueMessageAtomic", { numberOfKeys: 3, lua: ` @@ -1489,13 +1771,9 @@ local messageId = ARGV[2] local timestamp = tonumber(ARGV[3]) local payload = ARGV[4] --- Add to sorted set (score = timestamp) redis.call('ZADD', queueKey, timestamp, messageId) - --- Store payload in hash redis.call('HSET', queueItemsKey, messageId, payload) --- Update master queue with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1505,7 +1783,7 @@ return 1 `, }); - // Atomic batch message enqueue with master queue update + // Atomic batch message enqueue with master queue update (legacy, used for drain only) this.redis.defineCommand("enqueueBatchAtomic", { numberOfKeys: 3, lua: ` @@ -1515,20 +1793,14 @@ local masterQueueKey = KEYS[3] local queueId = ARGV[1] --- Args after queueId are triples: [messageId, timestamp, payload, ...] for i = 2, #ARGV, 3 do local messageId = ARGV[i] local timestamp = tonumber(ARGV[i + 1]) local payload = ARGV[i + 2] - - -- Add to sorted set redis.call('ZADD', queueKey, timestamp, messageId) - - -- Store payload in hash redis.call('HSET', queueItemsKey, messageId, payload) end --- Update master queue with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1538,7 +1810,7 @@ return (#ARGV - 1) / 3 `, }); - // Update master queue if queue is empty + // Update master queue if queue is empty (legacy, used for drain) this.redis.defineCommand("updateMasterQueueIfEmpty", { numberOfKeys: 2, lua: ` @@ -1551,7 +1823,6 @@ if count == 0 then redis.call('ZREM', masterQueueKey, queueId) return 1 else - -- Update with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1561,6 +1832,124 @@ end `, }); + // ---- New V2 Lua scripts (two-level tenant dispatch) ---- + + // Atomic single message enqueue with tenant dispatch index update + this.redis.defineCommand("enqueueMessageAtomicV2", { + numberOfKeys: 4, + lua: ` +local queueKey = KEYS[1] +local queueItemsKey = KEYS[2] +local tenantQueueIndexKey = KEYS[3] +local dispatchKey = KEYS[4] + +local queueId = ARGV[1] +local messageId = ARGV[2] +local timestamp = tonumber(ARGV[3]) +local payload = ARGV[4] +local tenantId = ARGV[5] + +-- Add to per-queue storage (same as before) +redis.call('ZADD', queueKey, timestamp, messageId) +redis.call('HSET', queueItemsKey, messageId, payload) + +-- Update tenant queue index (Level 2) with queue's oldest message +local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) with tenant's oldest across all queues +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) +end + +return 1 + `, + }); + + // Atomic batch message enqueue with tenant dispatch index update + this.redis.defineCommand("enqueueBatchAtomicV2", { + numberOfKeys: 4, + lua: ` +local queueKey = KEYS[1] +local queueItemsKey = KEYS[2] +local tenantQueueIndexKey = KEYS[3] +local dispatchKey = KEYS[4] + +local queueId = ARGV[1] +local tenantId = ARGV[2] + +-- Args after queueId and tenantId are triples: [messageId, timestamp, payload, ...] +for i = 3, #ARGV, 3 do + local messageId = ARGV[i] + local timestamp = tonumber(ARGV[i + 1]) + local payload = ARGV[i + 2] + redis.call('ZADD', queueKey, timestamp, messageId) + redis.call('HSET', queueItemsKey, messageId, payload) +end + +-- Update tenant queue index (Level 2) +local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) +end + +return (#ARGV - 2) / 3 + `, + }); + + // Update tenant dispatch indexes after dequeue/complete + // Handles both queue-empty (remove from indexes) and queue-has-messages (update scores) + this.redis.defineCommand("updateDispatchIndexes", { + numberOfKeys: 3, + lua: ` +local queueKey = KEYS[1] +local tenantQueueIndexKey = KEYS[2] +local dispatchKey = KEYS[3] +local queueId = ARGV[1] +local tenantId = ARGV[2] + +local count = redis.call('ZCARD', queueKey) +if count == 0 then + -- Queue is empty: remove from tenant queue index + redis.call('ZREM', tenantQueueIndexKey, queueId) + + -- Check if tenant has any queues left + local tenantQueueCount = redis.call('ZCARD', tenantQueueIndexKey) + if tenantQueueCount == 0 then + -- No more queues: remove tenant from dispatch + redis.call('ZREM', dispatchKey, tenantId) + else + -- Update dispatch score to tenant's new oldest + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) + end + end + return 1 +else + -- Queue still has messages: update scores + local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) + end + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) + end + return 0 +end + `, + }); + // Register worker queue commands if enabled if (this.workerQueueManager) { this.workerQueueManager.registerCommands(this.redis); @@ -1571,6 +1960,7 @@ end // Extend Redis interface for custom commands declare module "@internal/redis" { interface RedisCommander { + // Legacy commands (kept for drain of old master queue) enqueueMessageAtomic( queueKey: string, queueItemsKey: string, @@ -1594,5 +1984,36 @@ declare module "@internal/redis" { queueKey: string, queueId: string ): Promise; + + // V2 commands (two-level tenant dispatch) + enqueueMessageAtomicV2( + queueKey: string, + queueItemsKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + messageId: string, + timestamp: string, + payload: string, + tenantId: string + ): Promise; + + enqueueBatchAtomicV2( + queueKey: string, + queueItemsKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + tenantId: string, + ...args: string[] + ): Promise; + + updateDispatchIndexes( + queueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + tenantId: string + ): Promise; } } diff --git a/packages/redis-worker/src/fair-queue/keyProducer.ts b/packages/redis-worker/src/fair-queue/keyProducer.ts index f63cdbed03e..1e1b2b42631 100644 --- a/packages/redis-worker/src/fair-queue/keyProducer.ts +++ b/packages/redis-worker/src/fair-queue/keyProducer.ts @@ -5,7 +5,9 @@ import type { FairQueueKeyProducer } from "./types.js"; * Uses a configurable prefix and standard key structure. * * Key structure: - * - Master queue: {prefix}:master:{shardId} + * - Master queue: {prefix}:master:{shardId} (legacy, drain-only) + * - Dispatch index: {prefix}:dispatch:{shardId} (Level 1: tenantIds) + * - Tenant queue index: {prefix}:tenantq:{tenantId} (Level 2: queueIds) * - Queue: {prefix}:queue:{queueId} * - Queue items: {prefix}:queue:{queueId}:items * - Concurrency: {prefix}:concurrency:{groupName}:{groupId} @@ -70,6 +72,18 @@ export class DefaultFairQueueKeyProducer implements FairQueueKeyProducer { return this.#buildKey("worker", consumerId); } + // ============================================================================ + // Tenant Dispatch Keys (Two-Level Index) + // ============================================================================ + + dispatchKey(shardId: number): string { + return this.#buildKey("dispatch", shardId.toString()); + } + + tenantQueueIndexKey(tenantId: string): string { + return this.#buildKey("tenantq", tenantId); + } + // ============================================================================ // Dead Letter Queue Keys // ============================================================================ diff --git a/packages/redis-worker/src/fair-queue/schedulers/drr.ts b/packages/redis-worker/src/fair-queue/schedulers/drr.ts index d06da05891f..c6cd1c7a988 100644 --- a/packages/redis-worker/src/fair-queue/schedulers/drr.ts +++ b/packages/redis-worker/src/fair-queue/schedulers/drr.ts @@ -2,6 +2,7 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redi import { BaseScheduler } from "../scheduler.js"; import type { DRRSchedulerConfig, + DispatchSchedulerContext, FairQueueKeyProducer, SchedulerContext, TenantQueues, @@ -132,6 +133,78 @@ export class DRRScheduler extends BaseScheduler { })); } + /** + * Select queues using the two-level tenant dispatch index. + * + * Algorithm: + * 1. ZRANGEBYSCORE on dispatch index (gets only tenants with queues - much smaller) + * 2. Add quantum to each tenant's deficit (atomically) + * 3. Check capacity as safety net (dispatch should only have tenants with capacity) + * 4. Select tenants with deficit >= 1, sorted by deficit (highest first) + * 5. For each tenant, fetch their queues from Level 2 index + */ + async selectQueuesFromDispatch( + dispatchShardKey: string, + consumerId: string, + context: DispatchSchedulerContext + ): Promise { + // Level 1: Get tenants from dispatch index + const tenants = await this.#getTenantsFromDispatch(dispatchShardKey); + + if (tenants.length === 0) { + return []; + } + + const tenantIds = tenants.map((t) => t.tenantId); + + // Add quantum to all active tenants atomically + const deficits = await this.#addQuantumToTenants(tenantIds); + + // Build tenant data with deficits and capacity checks + const tenantData: Array<{ + tenantId: string; + deficit: number; + isAtCapacity: boolean; + }> = await Promise.all( + tenantIds.map(async (tenantId, index) => { + // Capacity check as safety net - dispatch should already exclude at-capacity tenants + // once capacity-based pruning is implemented as a follow-up + const isAtCapacity = await context.isAtCapacity("tenant", tenantId); + return { + tenantId, + deficit: deficits[index] ?? 0, + isAtCapacity, + }; + }) + ); + + // Filter out tenants at capacity or with no deficit + const eligibleTenants = tenantData.filter((t) => !t.isAtCapacity && t.deficit >= 1); + + // Sort by deficit (highest first for fairness) + eligibleTenants.sort((a, b) => b.deficit - a.deficit); + + this.logger.debug("DRR dispatch: tenant selection complete", { + dispatchTenants: tenants.length, + eligibleTenants: eligibleTenants.length, + topTenantDeficit: eligibleTenants[0]?.deficit, + }); + + // Level 2: For each eligible tenant, fetch their queues + const result: TenantQueues[] = []; + for (const { tenantId } of eligibleTenants) { + const queues = await context.getQueuesForTenant(tenantId); + if (queues.length > 0) { + result.push({ + tenantId, + queues: queues.map((q) => q.queueId), + }); + } + } + + return result; + } + /** * Record that a message was processed from a tenant. * Decrements the tenant's deficit. @@ -200,6 +273,35 @@ export class DRRScheduler extends BaseScheduler { return `${this.keys.masterQueueKey(0).split(":")[0]}:drr:deficit`; } + async #getTenantsFromDispatch( + dispatchKey: string + ): Promise> { + const now = Date.now(); + const results = await this.redis.zrangebyscore( + dispatchKey, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + this.masterQueueLimit + ); + + const tenants: Array<{ tenantId: string; score: number }> = []; + for (let i = 0; i < results.length; i += 2) { + const tenantId = results[i]; + const scoreStr = results[i + 1]; + if (tenantId && scoreStr) { + tenants.push({ + tenantId, + score: parseFloat(scoreStr), + }); + } + } + + return tenants; + } + async #getQueuesFromShard(shardKey: string): Promise { const now = Date.now(); const results = await this.redis.zrangebyscore( diff --git a/packages/redis-worker/src/fair-queue/telemetry.ts b/packages/redis-worker/src/fair-queue/telemetry.ts index 0dbdcd87113..373e88aa93b 100644 --- a/packages/redis-worker/src/fair-queue/telemetry.ts +++ b/packages/redis-worker/src/fair-queue/telemetry.ts @@ -56,8 +56,10 @@ export interface FairQueueMetrics { // Observable gauges (registered with callbacks) queueLength: ObservableGauge; masterQueueLength: ObservableGauge; + dispatchLength: ObservableGauge; inflightCount: ObservableGauge; dlqLength: ObservableGauge; + legacyDrainComplete: ObservableGauge; } /** @@ -250,8 +252,10 @@ export class FairQueueTelemetry { registerGaugeCallbacks(callbacks: { getQueueLength?: (queueId: string) => Promise; getMasterQueueLength?: (shardId: number) => Promise; + getDispatchLength?: (shardId: number) => Promise; getInflightCount?: (shardId: number) => Promise; getDLQLength?: (tenantId: string) => Promise; + isLegacyDrainComplete?: (shardId: number) => boolean; shardCount?: number; observedQueues?: string[]; observedTenants?: string[]; @@ -273,7 +277,7 @@ export class FairQueueTelemetry { }); } - // Master queue length gauge + // Legacy master queue length gauge (draining, should trend to 0) if (callbacks.getMasterQueueLength && callbacks.shardCount) { const getMasterQueueLength = callbacks.getMasterQueueLength; const shardCount = callbacks.shardCount; @@ -288,6 +292,21 @@ export class FairQueueTelemetry { }); } + // Dispatch index length gauge (new two-level dispatch, tenant count per shard) + if (callbacks.getDispatchLength && callbacks.shardCount) { + const getDispatchLength = callbacks.getDispatchLength; + const shardCount = callbacks.shardCount; + + this.metrics.dispatchLength.addCallback(async (observableResult) => { + for (let shardId = 0; shardId < shardCount; shardId++) { + const length = await getDispatchLength(shardId); + observableResult.observe(length, { + [FairQueueAttributes.SHARD_ID]: shardId.toString(), + }); + } + }); + } + // Inflight count gauge if (callbacks.getInflightCount && callbacks.shardCount) { const getInflightCount = callbacks.getInflightCount; @@ -317,6 +336,20 @@ export class FairQueueTelemetry { } }); } + + // Legacy drain complete gauge (1 = drained, 0 = still draining) + if (callbacks.isLegacyDrainComplete && callbacks.shardCount) { + const isLegacyDrainComplete = callbacks.isLegacyDrainComplete; + const shardCount = callbacks.shardCount; + + this.metrics.legacyDrainComplete.addCallback((observableResult) => { + for (let shardId = 0; shardId < shardCount; shardId++) { + observableResult.observe(isLegacyDrainComplete(shardId) ? 1 : 0, { + [FairQueueAttributes.SHARD_ID]: shardId.toString(), + }); + } + }); + } } // ============================================================================ @@ -414,9 +447,13 @@ export class FairQueueTelemetry { unit: "messages", }), masterQueueLength: this.meter.createObservableGauge(`${this.name}.master_queue.length`, { - description: "Number of queues in master queue shard", + description: "Number of queues in legacy master queue shard (draining)", unit: "queues", }), + dispatchLength: this.meter.createObservableGauge(`${this.name}.dispatch.length`, { + description: "Number of tenants in dispatch index shard", + unit: "tenants", + }), inflightCount: this.meter.createObservableGauge(`${this.name}.inflight.count`, { description: "Number of messages currently being processed", unit: "messages", @@ -425,6 +462,13 @@ export class FairQueueTelemetry { description: "Number of messages in dead letter queue", unit: "messages", }), + legacyDrainComplete: this.meter.createObservableGauge( + `${this.name}.legacy_drain.complete`, + { + description: "Whether legacy master queue shard drain is complete (1=done, 0=draining)", + unit: "boolean", + } + ), }; } } diff --git a/packages/redis-worker/src/fair-queue/tenantDispatch.ts b/packages/redis-worker/src/fair-queue/tenantDispatch.ts new file mode 100644 index 00000000000..56f57f7fa27 --- /dev/null +++ b/packages/redis-worker/src/fair-queue/tenantDispatch.ts @@ -0,0 +1,182 @@ +import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; +import { jumpHash } from "@trigger.dev/core/v3/serverOnly"; +import type { FairQueueKeyProducer, QueueWithScore } from "./types.js"; + +export interface TenantDispatchOptions { + redis: RedisOptions; + keys: FairQueueKeyProducer; + shardCount: number; +} + +export interface TenantWithScore { + tenantId: string; + score: number; +} + +/** + * TenantDispatch manages the two-level tenant dispatch index. + * + * Level 1 - Dispatch Index (per shard): + * Key: {prefix}:dispatch:{shardId} + * ZSET of tenantIds scored by oldest message timestamp across their queues. + * Only tenants with queues containing messages appear here. + * + * Level 2 - Per-Tenant Queue Index: + * Key: {prefix}:tenantq:{tenantId} + * ZSET of queueIds scored by oldest message timestamp in that queue. + * + * This replaces the flat master queue for new enqueues, isolating each tenant's + * queue backlog so the scheduler iterates tenants (not queues) at Level 1. + */ +export class TenantDispatch { + private redis: Redis; + private keys: FairQueueKeyProducer; + private shardCount: number; + + constructor(private options: TenantDispatchOptions) { + this.redis = createRedisClient(options.redis); + this.keys = options.keys; + this.shardCount = Math.max(1, options.shardCount); + } + + /** + * Get the shard ID for a queue. + * Uses the same jump consistent hash as MasterQueue for consistency. + */ + getShardForQueue(queueId: string): number { + return jumpHash(queueId, this.shardCount); + } + + /** + * Get eligible tenants from a dispatch shard (Level 1). + * Returns tenants ordered by oldest message (lowest score first). + */ + async getTenantsFromShard( + shardId: number, + limit: number = 1000, + maxScore?: number + ): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + const score = maxScore ?? Date.now(); + + const results = await this.redis.zrangebyscore( + dispatchKey, + "-inf", + score, + "WITHSCORES", + "LIMIT", + 0, + limit + ); + + const tenants: TenantWithScore[] = []; + for (let i = 0; i < results.length; i += 2) { + const tenantId = results[i]; + const scoreStr = results[i + 1]; + if (tenantId && scoreStr) { + tenants.push({ + tenantId, + score: parseFloat(scoreStr), + }); + } + } + + return tenants; + } + + /** + * Get queues for a specific tenant (Level 2). + * Returns queues ordered by oldest message (lowest score first). + */ + async getQueuesForTenant( + tenantId: string, + limit: number = 1000, + maxScore?: number + ): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + const score = maxScore ?? Date.now(); + + const results = await this.redis.zrangebyscore( + tenantQueueKey, + "-inf", + score, + "WITHSCORES", + "LIMIT", + 0, + limit + ); + + const queues: QueueWithScore[] = []; + for (let i = 0; i < results.length; i += 2) { + const queueId = results[i]; + const scoreStr = results[i + 1]; + if (queueId && scoreStr) { + queues.push({ + queueId, + score: parseFloat(scoreStr), + tenantId, + }); + } + } + + return queues; + } + + /** + * Get the number of tenants in a dispatch shard. + */ + async getShardTenantCount(shardId: number): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + return await this.redis.zcard(dispatchKey); + } + + /** + * Get total tenant count across all dispatch shards. + * Note: tenants may appear in multiple shards, so this may overcount. + */ + async getTotalTenantCount(): Promise { + const counts = await Promise.all( + Array.from({ length: this.shardCount }, (_, i) => this.getShardTenantCount(i)) + ); + return counts.reduce((sum, count) => sum + count, 0); + } + + /** + * Get the number of queues for a tenant. + */ + async getTenantQueueCount(tenantId: string): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + return await this.redis.zcard(tenantQueueKey); + } + + /** + * Remove a tenant from a specific dispatch shard. + */ + async removeTenantFromShard(shardId: number, tenantId: string): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.zrem(dispatchKey, tenantId); + } + + /** + * Add a tenant to a dispatch shard with the given score. + */ + async addTenantToShard(shardId: number, tenantId: string, score: number): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.zadd(dispatchKey, score, tenantId); + } + + /** + * Remove a queue from a tenant's queue index. + */ + async removeQueueFromTenant(tenantId: string, queueId: string): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + await this.redis.zrem(tenantQueueKey, queueId); + } + + /** + * Close the Redis connection. + */ + async close(): Promise { + await this.redis.quit(); + } +} diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts new file mode 100644 index 00000000000..ec33ad8c32f --- /dev/null +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -0,0 +1,615 @@ +import { describe, expect } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { createRedisClient } from "@internal/redis"; +import { z } from "zod"; +import { + FairQueue, + DefaultFairQueueKeyProducer, + DRRScheduler, + WorkerQueueManager, +} from "../index.js"; +import type { FairQueueKeyProducer, StoredMessage } from "../types.js"; +import type { RedisOptions } from "@internal/redis"; + +const TestPayloadSchema = z.object({ value: z.string() }); +type TestPayload = z.infer; +const TEST_WORKER_QUEUE_ID = "test-worker-queue"; + +/** + * Minimal test helper for tenant dispatch tests. + */ +class TestHelper { + public fairQueue: FairQueue; + private workerQueueManager: WorkerQueueManager; + private isRunning = false; + private abortController = new AbortController(); + private consumerLoops: Promise[] = []; + private messageHandler?: (ctx: { + message: { id: string; queueId: string; payload: TestPayload; attempt: number }; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise; + + constructor( + private redisOptions: RedisOptions, + private keys: FairQueueKeyProducer, + options: { + shardCount?: number; + consumerIntervalMs?: number; + concurrencyLimit?: number; + } = {} + ) { + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + this.fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: options.shardCount ?? 1, + consumerIntervalMs: options.consumerIntervalMs ?? 20, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + concurrencyGroups: options.concurrencyLimit + ? [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => options.concurrencyLimit!, + defaultLimit: options.concurrencyLimit!, + }, + ] + : undefined, + }); + + this.workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + } + + onMessage( + handler: (ctx: { + message: { id: string; queueId: string; payload: TestPayload; attempt: number }; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise + ): void { + this.messageHandler = handler; + } + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + this.abortController = new AbortController(); + this.fairQueue.start(); + this.consumerLoops.push(this.#runConsumerLoop()); + } + + async stop(): Promise { + if (!this.isRunning) return; + this.isRunning = false; + this.abortController.abort(); + await this.fairQueue.stop(); + await Promise.allSettled(this.consumerLoops); + this.consumerLoops = []; + } + + async close(): Promise { + await this.stop(); + await this.fairQueue.close(); + await this.workerQueueManager.close(); + } + + async #runConsumerLoop(): Promise { + try { + while (this.isRunning) { + if (!this.messageHandler) { + await new Promise((resolve) => setTimeout(resolve, 50)); + continue; + } + try { + const messageKey = await this.workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + this.abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await this.fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + await this.messageHandler({ + message: { + id: storedMessage.id, + queueId: storedMessage.queueId, + payload: storedMessage.payload, + attempt: storedMessage.attempt, + }, + complete: () => this.fairQueue.completeMessage(messageId, queueId), + release: () => this.fairQueue.releaseMessage(messageId, queueId), + fail: (error?: Error) => this.fairQueue.failMessage(messageId, queueId, error), + }); + } catch { + if (!this.isRunning) break; + } + } + } catch { + // Consumer loop stopped + } + } +} + +describe("Two-Level Tenant Dispatch", () => { + describe("enqueue writes to new index only", () => { + redisTest( + "should populate dispatch and tenant queue indexes, not old master queue", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue messages to two different queues for two tenants + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "msg1" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:q1", + tenantId: "t2", + payload: { value: "msg2" }, + }); + + // Check new dispatch index (Level 1): should have both tenants + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1, "WITHSCORES"); + expect(dispatchMembers.length).toBeGreaterThanOrEqual(2); // at least 1 tenant per shard + + // Check tenant queue indexes (Level 2) + const t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues).toContain("tenant:t1:queue:q1"); + + const t2Queues = await redis.zrange(keys.tenantQueueIndexKey("t2"), 0, -1); + expect(t2Queues).toContain("tenant:t2:queue:q1"); + + // Check old master queue: should be EMPTY (new enqueues don't write there) + const masterMembers = await redis.zrange(keys.masterQueueKey(0), 0, -1); + expect(masterMembers.length).toBe(0); + + // Per-queue storage should still work as before + const queueLength = await helper.fairQueue.getQueueLength("tenant:t1:queue:q1"); + expect(queueLength).toBe(1); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should populate dispatch index correctly for batch enqueue", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + await helper.fairQueue.enqueueBatch({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + messages: [ + { payload: { value: "msg1" } }, + { payload: { value: "msg2" } }, + { payload: { value: "msg3" } }, + ], + }); + + // Tenant queue index should have the queue + const t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues).toContain("tenant:t1:queue:q1"); + + // Dispatch should have the tenant + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t1"); + + // Per-queue storage should have all 3 messages + const queueLength = await helper.fairQueue.getQueueLength("tenant:t1:queue:q1"); + expect(queueLength).toBe(3); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("dispatch consumer processes messages", () => { + redisTest( + "should process messages via tenant dispatch path", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: string[] = []; + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue messages + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "first" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:q1", + tenantId: "t2", + payload: { value: "second" }, + }); + + // Set up consumer + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + + helper.start(); + + // Wait for messages to be processed + await waitFor(() => processed.length === 2, 5000); + expect(processed).toContain("first"); + expect(processed).toContain("second"); + + await helper.close(); + } + ); + }); + + describe("complete updates dispatch indexes", () => { + redisTest( + "should remove empty queue from tenant index and tenant from dispatch", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue one message + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "only" }, + }); + + // Verify indexes populated + let t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues.length).toBe(1); + let dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t1"); + + // Process and complete the message + helper.onMessage(async (ctx) => { + await ctx.complete(); + }); + helper.start(); + + // Wait for processing + await waitFor( + async () => (await helper.fairQueue.getQueueLength("tenant:t1:queue:q1")) === 0, + 5000 + ); + + // Allow index updates to propagate + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Verify indexes cleaned up + t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues.length).toBe(0); + dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).not.toContain("t1"); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should keep tenant in dispatch when other queues remain", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + const helper = new TestHelper(redisOptions, keys, { concurrencyLimit: 1 }); + + // Enqueue to two queues for the same tenant + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "queue1" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q2", + tenantId: "t1", + payload: { value: "queue2" }, + }); + + // Process messages one at a time (concurrency limit 1) + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + helper.start(); + + // Wait for first message + await waitFor(() => processed.length >= 1, 5000); + + // Tenant should still be in dispatch (has remaining queue) + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + // After first complete, tenant may still be in dispatch due to second queue + // (exact timing depends on consumer loop) + + // Wait for both messages + await waitFor(() => processed.length === 2, 5000); + expect(processed).toContain("queue1"); + expect(processed).toContain("queue2"); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("legacy drain", () => { + redisTest( + "should drain pre-populated master queue alongside new dispatch", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + // Simulate pre-deploy state: write directly to old master queue + queue storage + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-msg-1", + queueId, + tenantId: "t1", + payload: { value: "legacy" }, + timestamp, + attempt: 1, + }; + + // Write to per-queue storage and old master queue (simulating pre-deploy) + await redis.zadd(queueKey, timestamp, "legacy-msg-1"); + await redis.hset(queueItemsKey, "legacy-msg-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Now create FairQueue (post-deploy) + const helper = new TestHelper(redisOptions, keys); + + // Also enqueue a new message (goes to dispatch only) + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:new", + tenantId: "t2", + payload: { value: "new" }, + }); + + // Verify: old message in master queue, new message in dispatch + const masterMembers = await redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers).toContain(queueId); + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t2"); + + // Process both messages + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + helper.start(); + + // Both messages should be processed (legacy from drain + new from dispatch) + await waitFor(() => processed.length === 2, 10000); + expect(processed).toContain("legacy"); + expect(processed).toContain("new"); + + // Old master queue should be empty after drain + const masterAfter = await redis.zcard(masterQueueKey); + expect(masterAfter).toBe(0); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("DRR selectQueuesFromDispatch", () => { + redisTest( + "should select tenants from dispatch with DRR fairness", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; value: string }> = []; + + const helper = new TestHelper(redisOptions, keys, { concurrencyLimit: 100 }); + + // Enqueue messages for multiple tenants + for (let i = 0; i < 5; i++) { + await helper.fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + for (let i = 0; i < 5; i++) { + await helper.fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + // Process all messages + helper.onMessage(async (ctx) => { + const tenantId = ctx.message.queueId.split(":")[1]!; + processed.push({ tenantId, value: ctx.message.payload.value }); + await ctx.complete(); + }); + helper.start(); + + await waitFor(() => processed.length === 10, 10000); + + // Both tenants should have been processed + const t1Count = processed.filter((p) => p.tenantId === "t1").length; + const t2Count = processed.filter((p) => p.tenantId === "t2").length; + expect(t1Count).toBe(5); + expect(t2Count).toBe(5); + + await helper.close(); + } + ); + }); + + describe("noisy neighbor isolation", () => { + redisTest( + "should not block other tenants when one tenant is at capacity", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; value: string }> = []; + let blockT1 = true; + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + const fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerIntervalMs: 20, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async (tenantId) => { + // t1 has very low concurrency, t2 has high + return tenantId === "t1" ? 1 : 100; + }, + defaultLimit: 10, + }, + ], + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Enqueue 20 messages for t1 (noisy neighbor with concurrency 1) + for (let i = 0; i < 20; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + + // Enqueue 3 messages for t2 (quiet tenant with high concurrency) + for (let i = 0; i < 3; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + // Start processing + fairQueue.start(); + const abortController = new AbortController(); + + const consumerLoop = (async () => { + while (!abortController.signal.aborted) { + try { + const messageKey = await workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + const tenantId = storedMessage.tenantId; + processed.push({ tenantId, value: storedMessage.payload.value }); + await fairQueue.completeMessage(messageId, queueId); + } catch { + if (abortController.signal.aborted) break; + } + } + })(); + + // Wait for t2's messages to be processed (they shouldn't be blocked by t1) + await waitFor( + () => processed.filter((p) => p.tenantId === "t2").length === 3, + 10000 + ); + + const t2ProcessedCount = processed.filter((p) => p.tenantId === "t2").length; + expect(t2ProcessedCount).toBe(3); + + // Clean up + abortController.abort(); + await Promise.allSettled([consumerLoop]); + await fairQueue.close(); + await workerQueueManager.close(); + } + ); + }); +}); + +// Helper to wait for a condition +async function waitFor( + condition: () => boolean | Promise, + timeoutMs: number = 5000, + intervalMs: number = 50 +): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const result = await condition(); + if (result) return; + await new Promise((resolve) => setTimeout(resolve, intervalMs)); + } + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 6451df1bea0..fcb4d20c090 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -180,6 +180,14 @@ export interface SchedulerContext { getQueueDescriptor(queueId: string): QueueDescriptor; } +/** + * Extended context for two-level dispatch scheduling. + */ +export interface DispatchSchedulerContext extends SchedulerContext { + /** Get queues for a specific tenant from the per-tenant queue index (Level 2) */ + getQueuesForTenant(tenantId: string): Promise; +} + /** * Pluggable scheduler interface for fair queue selection. */ @@ -199,6 +207,18 @@ export interface FairScheduler { context: SchedulerContext ): Promise; + /** + * Select queues using the two-level tenant dispatch index. + * Level 1: reads tenantIds from dispatch shard. + * Level 2: reads queueIds from per-tenant index. + * Optional - falls back to selectQueues with flat queue list if not implemented. + */ + selectQueuesFromDispatch?( + dispatchShardKey: string, + consumerId: string, + context: DispatchSchedulerContext + ): Promise; + /** * Called after processing a message to update scheduler state. * Optional - not all schedulers need to track state. @@ -292,6 +312,12 @@ export interface FairQueueKeyProducer { /** Get the dead letter queue data hash key for a tenant */ deadLetterQueueDataKey(tenantId: string): string; + // Tenant dispatch keys (two-level index) + /** Get the dispatch index key for a shard (Level 1: tenantIds with capacity) */ + dispatchKey(shardId: number): string; + /** Get the per-tenant queue index key (Level 2: queueIds for a tenant) */ + tenantQueueIndexKey(tenantId: string): string; + // Extraction methods /** Extract tenant ID from a queue ID */ extractTenantId(queueId: string): string; From e1604f7ac9a0b0b336f372ea064670a38fa3d875 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 25 Feb 2026 22:39:08 +0000 Subject: [PATCH 2/9] safer legacy master queue draining --- packages/redis-worker/src/fair-queue/index.ts | 18 +++------------ .../redis-worker/src/fair-queue/telemetry.ts | 22 ------------------- 2 files changed, 3 insertions(+), 37 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 4374699b01b..3b7bb274e3d 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -117,7 +117,6 @@ export class FairQueue { // Two-level tenant dispatch private tenantDispatch: TenantDispatch; - private legacyDrainComplete: boolean[]; constructor(private options: FairQueueOptions) { this.redis = createRedisClient(options.redis); @@ -193,8 +192,6 @@ export class FairQueue { shardCount: this.shardCount, }); - // Track per-shard drain status of legacy master queue - this.legacyDrainComplete = new Array(this.shardCount).fill(false); if (options.concurrencyGroups && options.concurrencyGroups.length > 0) { this.concurrencyManager = new ConcurrencyManager({ @@ -257,9 +254,6 @@ export class FairQueue { getDLQLength: async (tenantId: string) => { return await this.getDeadLetterQueueLength(tenantId); }, - isLegacyDrainComplete: (shardId: number) => { - return this.legacyDrainComplete[shardId] ?? false; - }, shardCount: this.shardCount, observedTenants: options?.observedTenants, }); @@ -835,16 +829,10 @@ export class FairQueue { hadWork = await this.#processDispatchShard(loopId, shardId, parentSpan); // Drain path: legacy master queue (simple scheduling, no DRR) - if (!this.legacyDrainComplete[shardId]) { + // Check ZCARD first (O(1)) to skip the drain path when empty + const legacyCount = await this.masterQueue.getShardQueueCount(shardId); + if (legacyCount > 0) { const drainHadWork = await this.#drainLegacyMasterQueueShard(loopId, shardId, parentSpan); - if (!drainHadWork) { - // Check if the old master queue shard is completely empty - const count = await this.masterQueue.getShardQueueCount(shardId); - if (count === 0) { - this.legacyDrainComplete[shardId] = true; - this.logger.info("Legacy master queue shard fully drained", { shardId }); - } - } hadWork = hadWork || drainHadWork; } diff --git a/packages/redis-worker/src/fair-queue/telemetry.ts b/packages/redis-worker/src/fair-queue/telemetry.ts index 373e88aa93b..e9531fc812a 100644 --- a/packages/redis-worker/src/fair-queue/telemetry.ts +++ b/packages/redis-worker/src/fair-queue/telemetry.ts @@ -59,7 +59,6 @@ export interface FairQueueMetrics { dispatchLength: ObservableGauge; inflightCount: ObservableGauge; dlqLength: ObservableGauge; - legacyDrainComplete: ObservableGauge; } /** @@ -255,7 +254,6 @@ export class FairQueueTelemetry { getDispatchLength?: (shardId: number) => Promise; getInflightCount?: (shardId: number) => Promise; getDLQLength?: (tenantId: string) => Promise; - isLegacyDrainComplete?: (shardId: number) => boolean; shardCount?: number; observedQueues?: string[]; observedTenants?: string[]; @@ -337,19 +335,6 @@ export class FairQueueTelemetry { }); } - // Legacy drain complete gauge (1 = drained, 0 = still draining) - if (callbacks.isLegacyDrainComplete && callbacks.shardCount) { - const isLegacyDrainComplete = callbacks.isLegacyDrainComplete; - const shardCount = callbacks.shardCount; - - this.metrics.legacyDrainComplete.addCallback((observableResult) => { - for (let shardId = 0; shardId < shardCount; shardId++) { - observableResult.observe(isLegacyDrainComplete(shardId) ? 1 : 0, { - [FairQueueAttributes.SHARD_ID]: shardId.toString(), - }); - } - }); - } } // ============================================================================ @@ -462,13 +447,6 @@ export class FairQueueTelemetry { description: "Number of messages in dead letter queue", unit: "messages", }), - legacyDrainComplete: this.meter.createObservableGauge( - `${this.name}.legacy_drain.complete`, - { - description: "Whether legacy master queue shard drain is complete (1=done, 0=draining)", - unit: "boolean", - } - ), }; } } From a7c1b4a2aa92437e9f33afc8151b8ca2fe3d0cfe Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 10:19:29 +0000 Subject: [PATCH 3/9] fix: update visibility manager release/reclaim to use dispatch indexes The releaseMessage and releaseMessageBatch Lua scripts were still writing to the old master queue shards. Updated them to write to the new dispatch indexes instead, so released/reclaimed messages go into the new two-level index atomically. Also removed the legacyDrainComplete flag in favor of checking ZCARD on each iteration (O(1)), and removed the redundant legacyDrainComplete otel metric since master_queue.length already shows drain status. --- packages/redis-worker/src/fair-queue/index.ts | 91 ++++++----------- .../redis-worker/src/fair-queue/visibility.ts | 97 +++++++++++++------ 2 files changed, 95 insertions(+), 93 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 3b7bb274e3d..56e83ab08df 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -1094,7 +1094,6 @@ export class FairQueue { ): Promise { const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const descriptor = this.queueDescriptorCache.get(queueId) ?? { id: queueId, tenantId, @@ -1153,12 +1152,16 @@ export class FairQueue { if (!reserved) { // Release ALL remaining messages (from index i onward) back to queue // This prevents messages from being stranded in the in-flight set + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); await this.visibilityManager.releaseBatch( claimedMessages.slice(i), queueId, queueKey, queueItemsKey, - masterQueueKey + tenantQueueIndexKey, + dispatchKey, + tenantId ); // Stop processing more messages from this queue since we're at capacity break; @@ -1293,7 +1296,6 @@ export class FairQueue { const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message for concurrency release @@ -1315,13 +1317,17 @@ export class FairQueue { } : { id: queueId, tenantId: "", metadata: {} }; - // Release back to queue (visibility manager updates old master queue internally) + // Release back to queue (visibility manager updates dispatch indexes atomically) + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); await this.visibilityManager.release( messageId, queueId, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, + descriptor.tenantId, Date.now() // Put at back of queue ); @@ -1330,17 +1336,6 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, messageId); } - // Update new dispatch indexes (message is back in queue, update scores) - const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); - await this.redis.updateDispatchIndexes( - queueKey, - tenantQueueIndexKey, - dispatchKey, - queueId, - descriptor.tenantId - ); - this.logger.debug("Message released", { messageId, queueId, @@ -1359,7 +1354,6 @@ export class FairQueue { const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message @@ -1391,7 +1385,6 @@ export class FairQueue { queueId, queueKey, queueItemsKey, - masterQueueKey, shardId, descriptor, error @@ -1407,7 +1400,6 @@ export class FairQueue { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, shardId: number, descriptor: QueueDescriptor, error?: Error @@ -1427,12 +1419,16 @@ export class FairQueue { // Release with delay, passing the updated message data so the Lua script // atomically writes the incremented attempt count when re-queuing. + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(shardId); await this.visibilityManager.release( storedMessage.id, queueId, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, + descriptor.tenantId, Date.now() + nextDelay, JSON.stringify(updatedMessage) ); @@ -1442,17 +1438,6 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, storedMessage.id); } - // Update dispatch indexes (message is back in queue with delay) - const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); - await this.redis.updateDispatchIndexes( - queueKey, - tenantQueueIndexKey, - dispatchKey, - queueId, - descriptor.tenantId - ); - this.telemetry.recordRetry(); this.logger.debug("Message scheduled for retry", { @@ -1550,11 +1535,17 @@ export class FairQueue { let totalReclaimed = 0; for (let shardId = 0; shardId < this.shardCount; shardId++) { - const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({ - queueKey: this.keys.queueKey(queueId), - queueItemsKey: this.keys.queueItemsKey(queueId), - masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)), - })); + const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => { + const tenantId = this.keys.extractTenantId(queueId); + const queueShardId = this.masterQueue.getShardForQueue(queueId); + return { + queueKey: this.keys.queueKey(queueId), + queueItemsKey: this.keys.queueItemsKey(queueId), + tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId), + dispatchKey: this.keys.dispatchKey(queueShardId), + tenantId, + }; + }); if (reclaimedMessages.length > 0) { // Release concurrency for all reclaimed messages in a single batch @@ -1580,32 +1571,8 @@ export class FairQueue { } } - // Update dispatch indexes for reclaimed queues (messages are back in queue) - const updatedQueues = new Set(); - for (const msg of reclaimedMessages) { - const key = `${msg.tenantId}:${msg.queueId}`; - if (updatedQueues.has(key)) continue; - updatedQueues.add(key); - - try { - const queueKey = this.keys.queueKey(msg.queueId); - const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(msg.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); - await this.redis.updateDispatchIndexes( - queueKey, - tenantQueueIndexKey, - dispatchKey, - msg.queueId, - msg.tenantId - ); - } catch (error) { - this.logger.error("Failed to update dispatch indexes for reclaimed message", { - queueId: msg.queueId, - tenantId: msg.tenantId, - error: error instanceof Error ? error.message : String(error), - }); - } - } + // Dispatch indexes are updated atomically by the releaseMessage Lua script + // inside reclaimTimedOut, so no separate index update needed here. } totalReclaimed += reclaimedMessages.length; diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index 80fbf2ef004..a182a4e790d 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -275,7 +275,9 @@ export class VisibilityManager { * @param queueId - The queue ID * @param queueKey - The Redis key for the queue * @param queueItemsKey - The Redis key for the queue items hash - * @param masterQueueKey - The Redis key for the master queue + * @param tenantQueueIndexKey - The Redis key for the tenant queue index (Level 2) + * @param dispatchKey - The Redis key for the dispatch index (Level 1) + * @param tenantId - The tenant ID * @param score - Optional score for the message (defaults to now) */ async release( @@ -283,7 +285,9 @@ export class VisibilityManager { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + tenantId: string, score?: number, updatedData?: string ): Promise { @@ -297,18 +301,20 @@ export class VisibilityManager { // 1. Get message data from in-flight (or use updatedData if provided) // 2. Remove from in-flight // 3. Add back to queue - // 4. Update master queue to ensure queue is picked up + // 4. Update dispatch indexes to ensure queue is picked up await this.redis.releaseMessage( inflightKey, inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, member, messageId, messageScore.toString(), queueId, - updatedData ?? "" + updatedData ?? "", + tenantId ); this.logger.debug("Message released", { @@ -327,7 +333,9 @@ export class VisibilityManager { * @param queueId - The queue ID * @param queueKey - The Redis key for the queue * @param queueItemsKey - The Redis key for the queue items hash - * @param masterQueueKey - The Redis key for the master queue + * @param tenantQueueIndexKey - The Redis key for the tenant queue index (Level 2) + * @param dispatchKey - The Redis key for the dispatch index (Level 1) + * @param tenantId - The tenant ID * @param score - Optional score for the messages (defaults to now) */ async releaseBatch( @@ -335,7 +343,9 @@ export class VisibilityManager { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + tenantId: string, score?: number ): Promise { if (messages.length === 0) { @@ -356,9 +366,11 @@ export class VisibilityManager { inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, messageScore.toString(), queueId, + tenantId, ...members, ...messageIds ); @@ -383,7 +395,9 @@ export class VisibilityManager { getQueueKeys: (queueId: string) => { queueKey: string; queueItemsKey: string; - masterQueueKey: string; + tenantQueueIndexKey: string; + dispatchKey: string; + tenantId: string; } ): Promise { const inflightKey = this.keys.inflightKey(shardId); @@ -410,7 +424,8 @@ export class VisibilityManager { continue; } const { messageId, queueId } = this.#parseMember(member); - const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId); + const { queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, tenantId } = + getQueueKeys(queueId); try { // Get message data BEFORE releasing so we can extract tenantId for concurrency release @@ -432,12 +447,14 @@ export class VisibilityManager { inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, member, messageId, score.toString(), queueId, - "" + "", + tenantId ); // Track reclaimed message for concurrency release @@ -669,21 +686,23 @@ return results `, }); - // Atomic release: remove from in-flight, add back to queue, update master queue + // Atomic release: remove from in-flight, add back to queue, update dispatch indexes this.redis.defineCommand("releaseMessage", { - numberOfKeys: 5, + numberOfKeys: 6, lua: ` local inflightKey = KEYS[1] local inflightDataKey = KEYS[2] local queueKey = KEYS[3] local queueItemsKey = KEYS[4] -local masterQueueKey = KEYS[5] +local tenantQueueIndexKey = KEYS[5] +local dispatchKey = KEYS[6] local member = ARGV[1] local messageId = ARGV[2] local score = tonumber(ARGV[3]) local queueId = ARGV[4] local updatedData = ARGV[5] +local tenantId = ARGV[6] -- Get message data from in-flight local payload = redis.call('HGET', inflightDataKey, messageId) @@ -706,12 +725,16 @@ redis.call('HDEL', inflightDataKey, messageId) redis.call('ZADD', queueKey, score, messageId) redis.call('HSET', queueItemsKey, messageId, payload) --- Update master queue with oldest message timestamp --- This ensures delayed messages don't push the queue priority to the future --- when there are other ready messages in the queue +-- Update tenant queue index (Level 2) with queue's oldest message local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then - redis.call('ZADD', masterQueueKey, oldest[2], queueId) + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) with tenant's oldest across all queues +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) end return 1 @@ -720,21 +743,23 @@ return 1 // Atomic batch release: release multiple messages back to queue this.redis.defineCommand("releaseMessageBatch", { - numberOfKeys: 5, + numberOfKeys: 6, lua: ` local inflightKey = KEYS[1] local inflightDataKey = KEYS[2] local queueKey = KEYS[3] local queueItemsKey = KEYS[4] -local masterQueueKey = KEYS[5] +local tenantQueueIndexKey = KEYS[5] +local dispatchKey = KEYS[6] local score = tonumber(ARGV[1]) local queueId = ARGV[2] +local tenantId = ARGV[3] -- Remaining args are: members..., messageIds... -- Calculate how many messages we have -local numMessages = (table.getn(ARGV) - 2) / 2 -local membersStart = 3 +local numMessages = (table.getn(ARGV) - 3) / 2 +local membersStart = 4 local messageIdsStart = membersStart + numMessages local releasedCount = 0 @@ -742,27 +767,33 @@ local releasedCount = 0 for i = 0, numMessages - 1 do local member = ARGV[membersStart + i] local messageId = ARGV[messageIdsStart + i] - + -- Get message data from in-flight local payload = redis.call('HGET', inflightDataKey, messageId) if payload then -- Remove from in-flight redis.call('ZREM', inflightKey, member) redis.call('HDEL', inflightDataKey, messageId) - + -- Add back to queue redis.call('ZADD', queueKey, score, messageId) redis.call('HSET', queueItemsKey, messageId, payload) - + releasedCount = releasedCount + 1 end end --- Update master queue with oldest message timestamp (only once at the end) +-- Update dispatch indexes (only once at the end) if releasedCount > 0 then + -- Update tenant queue index (Level 2) local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then - redis.call('ZADD', masterQueueKey, oldest[2], queueId) + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) + end + -- Update dispatch index (Level 1) + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) end end @@ -822,12 +853,14 @@ declare module "@internal/redis" { inflightDataKey: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, member: string, messageId: string, score: string, queueId: string, - updatedData: string + updatedData: string, + tenantId: string ): Promise; releaseMessageBatch( @@ -835,9 +868,11 @@ declare module "@internal/redis" { inflightDataKey: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, score: string, queueId: string, + tenantId: string, ...membersAndMessageIds: string[] ): Promise; From 8d0a826ef38129b7fca52f6c17e22223632a1ba1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 10:24:34 +0000 Subject: [PATCH 4/9] test: add release/reclaim dispatch index tests and legacy migration tests Tests that release (retry) and reclaim (visibility timeout) correctly update dispatch indexes instead of master queue. Also tests that legacy pre-deploy messages in the old master queue migrate to the new dispatch index when they get reclaimed or retried. --- .../fair-queue/tests/tenantDispatch.test.ts | 247 ++++++++++++++++++ 1 file changed, 247 insertions(+) diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts index ec33ad8c32f..4eda9d0854b 100644 --- a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -6,6 +6,7 @@ import { FairQueue, DefaultFairQueueKeyProducer, DRRScheduler, + ExponentialBackoffRetry, WorkerQueueManager, } from "../index.js"; import type { FairQueueKeyProducer, StoredMessage } from "../types.js"; @@ -38,6 +39,9 @@ class TestHelper { shardCount?: number; consumerIntervalMs?: number; concurrencyLimit?: number; + visibilityTimeoutMs?: number; + reclaimIntervalMs?: number; + retry?: { maxAttempts: number; delayMs: number }; } = {} ) { const scheduler = new DRRScheduler({ @@ -54,8 +58,20 @@ class TestHelper { payloadSchema: TestPayloadSchema, shardCount: options.shardCount ?? 1, consumerIntervalMs: options.consumerIntervalMs ?? 20, + visibilityTimeoutMs: options.visibilityTimeoutMs, + reclaimIntervalMs: options.reclaimIntervalMs, startConsumers: false, workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + retry: options.retry + ? { + strategy: new ExponentialBackoffRetry({ + maxAttempts: options.retry.maxAttempts, + baseDelay: options.retry.delayMs, + maxDelay: options.retry.delayMs, + factor: 1, + }), + } + : undefined, concurrencyGroups: options.concurrencyLimit ? [ { @@ -597,6 +613,237 @@ describe("Two-Level Tenant Dispatch", () => { } ); }); + describe("release updates dispatch indexes", () => { + redisTest( + "should update dispatch indexes when message is released for retry", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const attempts: number[] = []; + + const helper = new TestHelper(redisOptions, keys, { + retry: { maxAttempts: 3, delayMs: 100 }, + }); + + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "retry-me" }, + }); + + helper.onMessage(async (ctx) => { + attempts.push(ctx.message.attempt); + if (ctx.message.attempt < 2) { + // Fail on first attempt to trigger retry + await ctx.fail(new Error("transient error")); + } else { + await ctx.complete(); + } + }); + helper.start(); + + // Wait for successful processing on second attempt + await waitFor(() => attempts.length >= 2 && attempts.includes(2), 10000); + + // After retry, the message went back into the queue via releaseMessage Lua. + // Verify it was picked up again (attempt 2 processed). + expect(attempts).toContain(1); + expect(attempts).toContain(2); + + // After completion, dispatch indexes should be cleaned up + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("reclaim updates dispatch indexes", () => { + redisTest( + "should update dispatch indexes when timed-out message is reclaimed", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processCount = { count: 0 }; + + const helper = new TestHelper(redisOptions, keys, { + visibilityTimeoutMs: 500, + reclaimIntervalMs: 200, + }); + + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "reclaim-me" }, + }); + + helper.onMessage(async (ctx) => { + processCount.count++; + if (processCount.count === 1) { + // First attempt: don't complete, let it timeout and get reclaimed + await new Promise((resolve) => setTimeout(resolve, 1500)); + } else { + // Second attempt after reclaim: complete normally + await ctx.complete(); + } + }); + helper.start(); + + // Wait for message to be processed twice (once timeout, once success) + await waitFor(() => processCount.count >= 2, 10000); + + // After reclaim + re-processing + completion, indexes should be clean + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("legacy message migration via reclaim", () => { + redisTest( + "should migrate legacy master queue message to dispatch index on reclaim", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + // Simulate pre-deploy: write message to old master queue + queue storage + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-reclaim-1", + queueId, + tenantId: "t1", + payload: { value: "legacy-reclaim" }, + timestamp, + attempt: 1, + }; + + await redis.zadd(queueKey, timestamp, "legacy-reclaim-1"); + await redis.hset(queueItemsKey, "legacy-reclaim-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Verify: message only in old master queue, not in dispatch + expect(await redis.zcard(keys.dispatchKey(0))).toBe(0); + expect(await redis.zcard(keys.tenantQueueIndexKey("t1"))).toBe(0); + + // Create FairQueue with short visibility timeout + const helper = new TestHelper(redisOptions, keys, { + visibilityTimeoutMs: 500, + reclaimIntervalMs: 200, + }); + + const processCount = { count: 0 }; + helper.onMessage(async (ctx) => { + processCount.count++; + if (processCount.count === 1) { + // First attempt: don't complete, let it timeout + // The reclaim will put it back in queue and update dispatch indexes + await new Promise((resolve) => setTimeout(resolve, 1500)); + } else { + // Second attempt: complete + processed.push(ctx.message.payload.value); + await ctx.complete(); + } + }); + helper.start(); + + // Wait for the message to be processed (first via drain, then reclaimed into dispatch) + await waitFor(() => processed.length === 1, 15000); + expect(processed[0]).toBe("legacy-reclaim"); + + // After completion, both old and new indexes should be clean + const masterAfter = await redis.zcard(masterQueueKey); + const dispatchAfter = await redis.zcard(keys.dispatchKey(0)); + const tenantQueuesAfter = await redis.zcard(keys.tenantQueueIndexKey("t1")); + + // Old master queue should still be empty (drain removed it) + // or at least the queue itself should be gone + expect(tenantQueuesAfter).toBe(0); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should migrate legacy message to dispatch index on retry failure", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const attempts: number[] = []; + + // Simulate pre-deploy: write message to old master queue + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-retry-1", + queueId, + tenantId: "t1", + payload: { value: "legacy-retry" }, + timestamp, + attempt: 1, + }; + + await redis.zadd(queueKey, timestamp, "legacy-retry-1"); + await redis.hset(queueItemsKey, "legacy-retry-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Create FairQueue with retry enabled + const helper = new TestHelper(redisOptions, keys, { + retry: { maxAttempts: 3, delayMs: 100 }, + }); + + helper.onMessage(async (ctx) => { + attempts.push(ctx.message.attempt); + if (ctx.message.attempt < 2) { + // Fail first attempt — triggers retry which writes to dispatch index + await ctx.fail(new Error("transient")); + } else { + await ctx.complete(); + } + }); + helper.start(); + + // Wait for retry to complete + await waitFor(() => attempts.includes(2), 10000); + + // The retry release should have written to dispatch indexes. + // After completion, indexes should be clean. + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + expect(attempts).toContain(1); + expect(attempts).toContain(2); + + await helper.close(); + await redis.quit(); + } + ); + }); }); // Helper to wait for a condition From d79ca44b8b658b613d70bc592fe34f65408e8c19 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 10:32:13 +0000 Subject: [PATCH 5/9] fix: fallback dispatch scheduler was reading empty master queue The fallback path for schedulers without selectQueuesFromDispatch was building allQueues from the dispatch index but then ignoring it and calling scheduler.selectQueues with the old master queue key (which is empty for new messages). Fixed to group the fetched queues by tenant directly with capacity filtering. --- packages/redis-worker/src/fair-queue/index.ts | 28 ++++++++----------- scripts/enhance-release-pr.mjs | 6 +++- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 56e83ab08df..e0a88939b7d 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -27,7 +27,6 @@ import type { GlobalRateLimiter, QueueCooloffState, QueueDescriptor, - QueueWithScore, SchedulerContext, StoredMessage, TenantQueues, @@ -968,8 +967,8 @@ export class FairQueue { /** * Fallback for schedulers that don't implement selectQueuesFromDispatch. - * Reads dispatch index, fetches per-tenant queues, flattens into the - * old-style master queue key format, and calls selectQueues. + * Reads dispatch index, fetches per-tenant queues, groups by tenant, + * and filters at-capacity tenants. No DRR deficit tracking in this path. */ async #fallbackDispatchToLegacyScheduler( loopId: string, @@ -981,23 +980,20 @@ export class FairQueue { const tenants = await this.tenantDispatch.getTenantsFromShard(shardId); if (tenants.length === 0) return []; - // For each tenant, get their queues and build a flat list - const allQueues: QueueWithScore[] = []; + // For each tenant, get their queues and build grouped result + const tenantQueues: TenantQueues[] = []; for (const { tenantId } of tenants) { + if (this.concurrencyManager) { + const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); + if (atCapacity) continue; + } const queues = await this.tenantDispatch.getQueuesForTenant(tenantId); - allQueues.push(...queues); + if (queues.length > 0) { + tenantQueues.push({ tenantId, queues: queues.map((q) => q.queueId) }); + } } - if (allQueues.length === 0) return []; - - // Use the base scheduler context (without dispatch methods) - const baseContext = this.#createSchedulerContext(); - - // We need a temporary master queue key for the scheduler. Rather than - // writing to Redis, we'll group the queues by tenant ourselves and - // apply the same logic as the legacy selectQueues. - const masterQueueKey = this.keys.masterQueueKey(shardId); - return this.scheduler.selectQueues(masterQueueKey, loopId, baseContext); + return tenantQueues; } /** diff --git a/scripts/enhance-release-pr.mjs b/scripts/enhance-release-pr.mjs index af8f8333b0d..9621d2920f3 100644 --- a/scripts/enhance-release-pr.mjs +++ b/scripts/enhance-release-pr.mjs @@ -302,7 +302,11 @@ function formatPrBody({ version, packageEntries, serverEntries, rawBody }) { "These changes affect the self-hosted Docker image and Trigger.dev Cloud:" ); lines.push(""); - for (const entry of allServer) lines.push(`- ${entry.text}`); + for (const entry of allServer) { + // Indent continuation lines so multi-line entries stay inside the list item + const indented = entry.text.replace(/\n/g, "\n "); + lines.push(`- ${indented}`); + } lines.push(""); } From 4a62c4b20aba925b0818f64e508c2e2cc35e4fa4 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 10:39:15 +0000 Subject: [PATCH 6/9] typecheck fixes --- .../fair-queue/tests/raceConditions.test.ts | 4 +- .../fair-queue/tests/tenantDispatch.test.ts | 5 +- .../src/fair-queue/tests/visibility.test.ts | 59 +++++++++++++------ 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts index 1222bd9e4f1..4d85603b984 100644 --- a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts @@ -627,7 +627,9 @@ describe("Race Condition Tests", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({ queueKey: keys.queueKey(queueId), queueItemsKey: keys.queueItemsKey(queueId), - masterQueueKey: keys.masterQueueKey(0), + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(queueId)), + dispatchKey: keys.dispatchKey(0), + tenantId: keys.extractTenantId(queueId), })); reclaimResults.push(reclaimedMessages.length); } diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts index 4eda9d0854b..de2a01473ce 100644 --- a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -66,9 +66,10 @@ class TestHelper { ? { strategy: new ExponentialBackoffRetry({ maxAttempts: options.retry.maxAttempts, - baseDelay: options.retry.delayMs, - maxDelay: options.retry.delayMs, + minTimeoutInMs: options.retry.delayMs, + maxTimeoutInMs: options.retry.delayMs, factor: 1, + randomize: false, }), } : undefined, diff --git a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts index a5685d51c9e..2ecdeb41e90 100644 --- a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts @@ -465,7 +465,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:release-batch"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add messages to queue and claim them for (let i = 1; i <= 5; i++) { @@ -501,7 +502,9 @@ describe("VisibilityManager", () => { queueId, queueKey, queueItemsKey, - masterQueueKey + tenantQueueIndexKey, + dispatchKey, + "t1" ); // Verify 2 messages still in-flight @@ -539,17 +542,18 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:empty-release"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Should not throw when releasing empty array - await manager.releaseBatch([], queueId, queueKey, queueItemsKey, masterQueueKey); + await manager.releaseBatch([], queueId, queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, "t1"); await manager.close(); } ); redisTest( - "should update master queue with oldest message timestamp", + "should update dispatch indexes with oldest message timestamp", { timeout: 10000 }, async ({ redisOptions }) => { keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); @@ -562,10 +566,11 @@ describe("VisibilityManager", () => { }); const redis = createRedisClient(redisOptions); - const queueId = "tenant:t1:queue:master-update"; + const queueId = "tenant:t1:queue:dispatch-update"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim messages const baseTime = Date.now(); @@ -586,11 +591,15 @@ describe("VisibilityManager", () => { const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3); // Release all messages back - await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, masterQueueKey); + await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, "t1"); - // Master queue should have been updated - const masterScore = await redis.zscore(masterQueueKey, queueId); - expect(masterScore).not.toBeNull(); + // Tenant queue index should have the queue with correct score + const tenantQueueScore = await redis.zscore(tenantQueueIndexKey, queueId); + expect(tenantQueueScore).not.toBeNull(); + + // Dispatch index should have the tenant + const dispatchScore = await redis.zscore(dispatchKey, "t1"); + expect(dispatchScore).not.toBeNull(); await manager.close(); await redis.quit(); @@ -616,7 +625,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:reclaim-test"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim a message const messageId = "reclaim-msg"; @@ -644,7 +654,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(1); @@ -690,7 +702,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:no-timeout"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim a message with long timeout const messageId = "long-timeout-msg"; @@ -712,7 +725,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(0); @@ -736,7 +751,8 @@ describe("VisibilityManager", () => { }); const redis = createRedisClient(redisOptions); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim messages for two different tenants for (const tenant of ["t1", "t2"]) { @@ -767,7 +783,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(2); @@ -798,7 +816,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:fallback-test"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); const inflightDataKey = keys.inflightDataKey(0); // Add and claim a message @@ -830,7 +849,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(1); From b855e489f013e963e44b799c9b93b3af8105b84b Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 11:08:46 +0000 Subject: [PATCH 7/9] Better DRR efficiency and making sure selection is fair --- packages/redis-worker/src/fair-queue/index.ts | 14 +- .../src/fair-queue/schedulers/drr.ts | 70 ++++----- .../fair-queue/tests/tenantDispatch.test.ts | 146 ++++++++++++++++++ packages/redis-worker/src/fair-queue/types.ts | 2 +- 4 files changed, 183 insertions(+), 49 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index e0a88939b7d..5c464ad85d9 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -717,17 +717,13 @@ export class FairQueue { } /** - * Get total queue count across all shards. + * Get total tenant count across dispatch shards plus any legacy queues still draining. */ async getTotalQueueCount(): Promise { - // Count from new dispatch index (primary) + legacy master queue (drain) const [dispatchCount, legacyCount] = await Promise.all([ this.tenantDispatch.getTotalTenantCount(), this.masterQueue.getTotalQueueCount(), ]); - // During migration, dispatch has tenant count (not queue count). - // For a more accurate queue count, we'd need to sum per-tenant queue counts. - // For now, return the dispatch tenant count + any legacy queues still draining. return dispatchCount + legacyCount; } @@ -857,8 +853,8 @@ export class FairQueue { // Create dispatch-aware scheduler context const schedulerContext: DispatchSchedulerContext = { ...this.#createSchedulerContext(), - getQueuesForTenant: async (tenantId: string) => { - return this.tenantDispatch.getQueuesForTenant(tenantId); + getQueuesForTenant: async (tenantId: string, limit?: number) => { + return this.tenantDispatch.getQueuesForTenant(tenantId, limit); }, }; @@ -1252,7 +1248,7 @@ export class FairQueue { tenantId: storedMessage.tenantId, metadata: storedMessage.metadata ?? {}, } - : { id: queueId, tenantId: "", metadata: {} }; + : { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} }; // Complete in visibility manager await this.visibilityManager.complete(messageId, queueId); @@ -1311,7 +1307,7 @@ export class FairQueue { tenantId: storedMessage.tenantId, metadata: storedMessage.metadata ?? {}, } - : { id: queueId, tenantId: "", metadata: {} }; + : { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} }; // Release back to queue (visibility manager updates dispatch indexes atomically) const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); diff --git a/packages/redis-worker/src/fair-queue/schedulers/drr.ts b/packages/redis-worker/src/fair-queue/schedulers/drr.ts index c6cd1c7a988..3e05ae8a34f 100644 --- a/packages/redis-worker/src/fair-queue/schedulers/drr.ts +++ b/packages/redis-worker/src/fair-queue/schedulers/drr.ts @@ -157,52 +157,44 @@ export class DRRScheduler extends BaseScheduler { const tenantIds = tenants.map((t) => t.tenantId); - // Add quantum to all active tenants atomically + // Add quantum to all active tenants atomically (1 Lua call) const deficits = await this.#addQuantumToTenants(tenantIds); - // Build tenant data with deficits and capacity checks - const tenantData: Array<{ - tenantId: string; - deficit: number; - isAtCapacity: boolean; - }> = await Promise.all( - tenantIds.map(async (tenantId, index) => { - // Capacity check as safety net - dispatch should already exclude at-capacity tenants - // once capacity-based pruning is implemented as a follow-up - const isAtCapacity = await context.isAtCapacity("tenant", tenantId); - return { - tenantId, - deficit: deficits[index] ?? 0, - isAtCapacity, - }; - }) - ); - - // Filter out tenants at capacity or with no deficit - const eligibleTenants = tenantData.filter((t) => !t.isAtCapacity && t.deficit >= 1); - - // Sort by deficit (highest first for fairness) - eligibleTenants.sort((a, b) => b.deficit - a.deficit); - - this.logger.debug("DRR dispatch: tenant selection complete", { - dispatchTenants: tenants.length, - eligibleTenants: eligibleTenants.length, - topTenantDeficit: eligibleTenants[0]?.deficit, - }); - - // Level 2: For each eligible tenant, fetch their queues - const result: TenantQueues[] = []; - for (const { tenantId } of eligibleTenants) { - const queues = await context.getQueuesForTenant(tenantId); + // Build candidates sorted by deficit (highest first) + const candidates = tenantIds + .map((tenantId, index) => ({ tenantId, deficit: deficits[index] ?? 0 })) + .filter((t) => t.deficit >= 1); + + candidates.sort((a, b) => b.deficit - a.deficit); + + // Pick the first tenant with available capacity and fetch their queues. + // This keeps the scheduler cheap: O(1) in the common case where the + // highest-deficit tenant has capacity. The consumer loop iterates fast + // (1ms yield between rounds) so we cycle through tenants quickly. + for (const { tenantId, deficit } of candidates) { + const isAtCapacity = await context.isAtCapacity("tenant", tenantId); + if (isAtCapacity) continue; + + // Limit queues fetched to what the tenant can actually process this round. + // deficit = max messages this tenant should process, so no point fetching + // more queues than that (each queue yields at least 1 message). + const queueLimit = Math.ceil(deficit); + const queues = await context.getQueuesForTenant(tenantId, queueLimit); if (queues.length > 0) { - result.push({ - tenantId, - queues: queues.map((q) => q.queueId), + this.logger.debug("DRR dispatch: selected tenant", { + dispatchTenants: tenants.length, + candidates: candidates.length, + selectedTenant: tenantId, + deficit, + queueLimit, + queuesReturned: queues.length, }); + + return [{ tenantId, queues: queues.map((q) => q.queueId) }]; } } - return result; + return []; } /** diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts index de2a01473ce..a22e6a58773 100644 --- a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -505,6 +505,152 @@ describe("Two-Level Tenant Dispatch", () => { ); }); + describe("DRR fairness across iterations", () => { + redisTest( + "should distribute processing fairly and not starve any tenant", + { timeout: 30000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; order: number }> = []; + let processOrder = 0; + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 5, + maxDeficit: 20, + }); + + const fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerIntervalMs: 10, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 100, // High limit so concurrency doesn't interfere + defaultLimit: 100, + }, + ], + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Scenario: 3 tenants with very different queue counts + // t1: 50 queues (heavy hitter) + // t2: 5 queues (medium) + // t3: 1 queue with 5 messages (small) + // All should get fair service - no tenant should be starved. + + for (let i = 0; i < 50; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + + for (let i = 0; i < 5; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + for (let i = 0; i < 5; i++) { + await fairQueue.enqueue({ + queueId: "tenant:t3:queue:q0", + tenantId: "t3", + payload: { value: `t3-${i}` }, + }); + } + + // Start processing + fairQueue.start(); + const abortController = new AbortController(); + + const consumerLoop = (async () => { + while (!abortController.signal.aborted) { + try { + const messageKey = await workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + processed.push({ tenantId: storedMessage.tenantId, order: processOrder++ }); + await fairQueue.completeMessage(messageId, queueId); + } catch { + if (abortController.signal.aborted) break; + } + } + })(); + + // Wait for all 60 messages to be processed + await waitFor(() => processed.length === 60, 20000); + + // === Fairness assertions === + + const t1 = processed.filter((p) => p.tenantId === "t1"); + const t2 = processed.filter((p) => p.tenantId === "t2"); + const t3 = processed.filter((p) => p.tenantId === "t3"); + + // All messages processed + expect(t1.length).toBe(50); + expect(t2.length).toBe(5); + expect(t3.length).toBe(5); + + // No tenant should be starved: every tenant should have at least one + // message processed in the first 20 messages. With quantum=5 and 3 tenants, + // each should get a turn within the first few iterations. + const first20 = processed.slice(0, 20); + const tenantsInFirst20 = new Set(first20.map((p) => p.tenantId)); + expect(tenantsInFirst20.size).toBe(3); + + // t2 and t3 should finish well before t1 (they have fewer messages). + // Check that t2's last message is processed before t1's last message. + const t2LastOrder = Math.max(...t2.map((p) => p.order)); + const t1LastOrder = Math.max(...t1.map((p) => p.order)); + expect(t2LastOrder).toBeLessThan(t1LastOrder); + + // t3 should also finish before t1 + const t3LastOrder = Math.max(...t3.map((p) => p.order)); + expect(t3LastOrder).toBeLessThan(t1LastOrder); + + // Check that t1 doesn't monopolize early processing. + // In the first 15 messages, t1 should have at most 10 (with quantum=5, + // t1 gets ~5 per round, and there are 3 tenants taking turns). + const t1InFirst15 = processed.slice(0, 15).filter((p) => p.tenantId === "t1").length; + expect(t1InFirst15).toBeLessThanOrEqual(10); + + // Clean up + abortController.abort(); + await Promise.allSettled([consumerLoop]); + await fairQueue.close(); + await workerQueueManager.close(); + } + ); + }); + describe("noisy neighbor isolation", () => { redisTest( "should not block other tenants when one tenant is at capacity", diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index fcb4d20c090..d10cad1d0d4 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -185,7 +185,7 @@ export interface SchedulerContext { */ export interface DispatchSchedulerContext extends SchedulerContext { /** Get queues for a specific tenant from the per-tenant queue index (Level 2) */ - getQueuesForTenant(tenantId: string): Promise; + getQueuesForTenant(tenantId: string, limit?: number): Promise; } /** From af54cdffbc6d36e9b65c17d4d135c8b1766b787f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 12:07:48 +0000 Subject: [PATCH 8/9] fix: dispatch index shard must be tenant-based, not queue-based --- packages/redis-worker/src/fair-queue/index.ts | 48 +++++++----- .../src/fair-queue/tenantDispatch.ts | 9 ++- .../fair-queue/tests/tenantDispatch.test.ts | 78 +++++++++++++++++++ 3 files changed, 110 insertions(+), 25 deletions(-) diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 5c464ad85d9..bfb60b6c552 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -273,9 +273,9 @@ export class FairQueue { const timestamp = options.timestamp ?? Date.now(); const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); - const shardId = this.masterQueue.getShardForQueue(options.queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId); const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); // Validate payload if schema provided and validation enabled if (this.validateOnEnqueue && this.payloadSchema) { @@ -332,7 +332,7 @@ export class FairQueue { [FairQueueAttributes.QUEUE_ID]: options.queueId, [FairQueueAttributes.TENANT_ID]: options.tenantId, [FairQueueAttributes.MESSAGE_ID]: messageId, - [FairQueueAttributes.SHARD_ID]: shardId.toString(), + [FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(), }); this.telemetry.recordEnqueue(); @@ -363,9 +363,9 @@ export class FairQueue { async (span) => { const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); - const shardId = this.masterQueue.getShardForQueue(options.queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId); const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); const now = Date.now(); // Store queue descriptor @@ -433,7 +433,7 @@ export class FairQueue { [FairQueueAttributes.QUEUE_ID]: options.queueId, [FairQueueAttributes.TENANT_ID]: options.tenantId, [FairQueueAttributes.MESSAGE_COUNT]: messageIds.length, - [FairQueueAttributes.SHARD_ID]: shardId.toString(), + [FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(), }); this.telemetry.recordEnqueueBatch(messageIds.length); @@ -1082,8 +1082,11 @@ export class FairQueue { loopId: string, queueId: string, tenantId: string, - shardId: number + _consumerShardId: number ): Promise { + // Dispatch shard is tenant-based (tenantId hash), not queue-based. + // In-flight/master queue shard is queue-based. + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); const descriptor = this.queueDescriptorCache.get(queueId) ?? { @@ -1126,7 +1129,7 @@ export class FairQueue { if (claimedMessages.length === 0) { // Queue is empty, update both old and new indexes and clean up caches - await this.#updateAllIndexesAfterDequeue(queueId, tenantId, shardId); + await this.#updateAllIndexesAfterDequeue(queueId, tenantId); this.queueDescriptorCache.delete(queueId); this.queueCooloffStates.delete(queueId); return 0; @@ -1145,7 +1148,7 @@ export class FairQueue { // Release ALL remaining messages (from index i onward) back to queue // This prevents messages from being stranded in the in-flight set const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.releaseBatch( claimedMessages.slice(i), queueId, @@ -1261,8 +1264,7 @@ export class FairQueue { // Update both old and new indexes, clean up caches if queue is empty const { queueEmpty } = await this.#updateAllIndexesAfterDequeue( queueId, - descriptor.tenantId, - shardId + descriptor.tenantId ); if (queueEmpty) { this.queueDescriptorCache.delete(queueId); @@ -1310,8 +1312,10 @@ export class FairQueue { : { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} }; // Release back to queue (visibility manager updates dispatch indexes atomically) + // Dispatch shard is tenant-based, not queue-based + const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId); const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.release( messageId, queueId, @@ -1372,12 +1376,13 @@ export class FairQueue { metadata: storedMessage.metadata ?? {}, }; + const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId); await this.#handleMessageFailure( storedMessage, queueId, queueKey, queueItemsKey, - shardId, + dispatchShardId, descriptor, error ); @@ -1392,7 +1397,7 @@ export class FairQueue { queueId: string, queueKey: string, queueItemsKey: string, - shardId: number, + dispatchShardId: number, descriptor: QueueDescriptor, error?: Error ): Promise { @@ -1412,7 +1417,7 @@ export class FairQueue { // Release with delay, passing the updated message data so the Lua script // atomically writes the incremented attempt count when re-queuing. const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.release( storedMessage.id, queueId, @@ -1529,12 +1534,12 @@ export class FairQueue { for (let shardId = 0; shardId < this.shardCount; shardId++) { const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => { const tenantId = this.keys.extractTenantId(queueId); - const queueShardId = this.masterQueue.getShardForQueue(queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); return { queueKey: this.keys.queueKey(queueId), queueItemsKey: this.keys.queueItemsKey(queueId), tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId), - dispatchKey: this.keys.dispatchKey(queueShardId), + dispatchKey: this.keys.dispatchKey(dispatchShardId), tenantId, }; }); @@ -1645,13 +1650,14 @@ export class FairQueue { */ async #updateAllIndexesAfterDequeue( queueId: string, - tenantId: string, - shardId: number + tenantId: string ): Promise<{ queueEmpty: boolean }> { + const queueShardId = this.masterQueue.getShardForQueue(queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); const queueKey = this.keys.queueKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); + const masterQueueKey = this.keys.masterQueueKey(queueShardId); const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); - const dispatchKey = this.keys.dispatchKey(shardId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); // Update legacy master queue (drain path, no-op if queue not there) const removedFromMaster = await this.redis.updateMasterQueueIfEmpty( diff --git a/packages/redis-worker/src/fair-queue/tenantDispatch.ts b/packages/redis-worker/src/fair-queue/tenantDispatch.ts index 56f57f7fa27..82663646f5a 100644 --- a/packages/redis-worker/src/fair-queue/tenantDispatch.ts +++ b/packages/redis-worker/src/fair-queue/tenantDispatch.ts @@ -40,11 +40,12 @@ export class TenantDispatch { } /** - * Get the shard ID for a queue. - * Uses the same jump consistent hash as MasterQueue for consistency. + * Get the dispatch shard ID for a tenant. + * Uses jump consistent hash on the tenant ID so each tenant + * always maps to exactly one dispatch shard. */ - getShardForQueue(queueId: string): number { - return jumpHash(queueId, this.shardCount); + getShardForTenant(tenantId: string): number { + return jumpHash(tenantId, this.shardCount); } /** diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts index a22e6a58773..feb1c93a0d1 100644 --- a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -991,6 +991,84 @@ describe("Two-Level Tenant Dispatch", () => { } ); }); + describe("dispatch shard is tenant-based, not queue-based", () => { + redisTest( + "tenant with queues in different queue shards should appear in only one dispatch shard", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const shardCount = 2; + + const helper = new TestHelper(redisOptions, keys, { shardCount }); + + try { + const tenantId = "tenant-shard-test"; + + // Find two queue IDs for the same tenant that hash to different queue shards + // by trying different queue names + const { MasterQueue: MQ } = await import("../masterQueue.js"); + const mq = new MQ({ redis: redisOptions, keys, shardCount }); + const { TenantDispatch: TD } = await import("../tenantDispatch.js"); + const td = new TD({ redis: redisOptions, keys, shardCount }); + + let queueShard0: string | null = null; + let queueShard1: string | null = null; + + for (let i = 0; i < 100; i++) { + const qId = `tenant:${tenantId}:queue:q${i}`; + const shard = mq.getShardForQueue(qId); + if (shard === 0 && !queueShard0) queueShard0 = qId; + if (shard === 1 && !queueShard1) queueShard1 = qId; + if (queueShard0 && queueShard1) break; + } + + expect(queueShard0).not.toBeNull(); + expect(queueShard1).not.toBeNull(); + + // Both queues belong to the same tenant, so dispatch shard should be the same + const expectedDispatchShard = td.getShardForTenant(tenantId); + + // Enqueue to both queues + await helper.fairQueue.enqueue({ + queueId: queueShard0!, + tenantId, + payload: { value: "msg-shard0" }, + }); + await helper.fairQueue.enqueue({ + queueId: queueShard1!, + tenantId, + payload: { value: "msg-shard1" }, + }); + + // Verify: tenant should only appear in one dispatch shard + const dispatch0 = await redis.zrange(keys.dispatchKey(0), 0, -1); + const dispatch1 = await redis.zrange(keys.dispatchKey(1), 0, -1); + + const inShard0 = dispatch0.includes(tenantId); + const inShard1 = dispatch1.includes(tenantId); + + // Tenant should appear in exactly one shard + expect(inShard0 !== inShard1).toBe(true); + + // And it should be the expected one + if (expectedDispatchShard === 0) { + expect(inShard0).toBe(true); + expect(inShard1).toBe(false); + } else { + expect(inShard0).toBe(false); + expect(inShard1).toBe(true); + } + + await mq.close(); + await td.close(); + } finally { + await helper.close(); + await redis.quit(); + } + } + ); + }); }); // Helper to wait for a condition From 321b462aa4f7c7508bf3950c1b9173ffe2aafb4e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Feb 2026 13:04:58 +0000 Subject: [PATCH 9/9] chore: add server changes file for dispatch shard fix add server changeset file --- .server-changes/fix-dispatch-shard-tenant-based.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .server-changes/fix-dispatch-shard-tenant-based.md diff --git a/.server-changes/fix-dispatch-shard-tenant-based.md b/.server-changes/fix-dispatch-shard-tenant-based.md new file mode 100644 index 00000000000..297b18cde72 --- /dev/null +++ b/.server-changes/fix-dispatch-shard-tenant-based.md @@ -0,0 +1,9 @@ +--- +area: webapp +type: feature +--- + +Two-level tenant dispatch architecture for batch queue processing. Replaces the +single master queue with a two-level index: a dispatch index (tenant → shard) +and per-tenant queue indexes (tenant → queues). This enables O(1) tenant +selection and fair scheduling across tenants regardless of queue count. Improves batch queue processing performance.