diff --git a/.server-changes/fix-dispatch-shard-tenant-based.md b/.server-changes/fix-dispatch-shard-tenant-based.md new file mode 100644 index 00000000000..297b18cde72 --- /dev/null +++ b/.server-changes/fix-dispatch-shard-tenant-based.md @@ -0,0 +1,9 @@ +--- +area: webapp +type: feature +--- + +Two-level tenant dispatch architecture for batch queue processing. Replaces the +single master queue with a two-level index: a dispatch index (tenant → shard) +and per-tenant queue indexes (tenant → queues). This enables O(1) tenant +selection and fair scheduling across tenants regardless of queue count. Improves batch queue processing performance. diff --git a/packages/redis-worker/src/fair-queue/index.ts b/packages/redis-worker/src/fair-queue/index.ts index 59177d11673..bfb60b6c552 100644 --- a/packages/redis-worker/src/fair-queue/index.ts +++ b/packages/redis-worker/src/fair-queue/index.ts @@ -6,6 +6,7 @@ import { setInterval } from "node:timers/promises"; import { type z } from "zod"; import { ConcurrencyManager } from "./concurrency.js"; import { MasterQueue } from "./masterQueue.js"; +import { TenantDispatch } from "./tenantDispatch.js"; import { type RetryStrategy, ExponentialBackoffRetry } from "./retry.js"; import { isAbortError } from "../utils.js"; import { @@ -17,6 +18,7 @@ import { import type { ConcurrencyGroupConfig, DeadLetterMessage, + DispatchSchedulerContext, EnqueueBatchOptions, EnqueueOptions, FairQueueKeyProducer, @@ -27,6 +29,7 @@ import type { QueueDescriptor, SchedulerContext, StoredMessage, + TenantQueues, } from "./types.js"; import { VisibilityManager } from "./visibility.js"; import { WorkerQueueManager } from "./workerQueue.js"; @@ -42,6 +45,7 @@ export * from "./scheduler.js"; export * from "./schedulers/index.js"; export * from "./retry.js"; export * from "./telemetry.js"; +export * from "./tenantDispatch.js"; /** * FairQueue is the main orchestrator for fair queue message routing. @@ -110,6 +114,9 @@ export class FairQueue { // Queue descriptor cache for message processing private queueDescriptorCache = new Map(); + // Two-level tenant dispatch + private tenantDispatch: TenantDispatch; + constructor(private options: FairQueueOptions) { this.redis = createRedisClient(options.redis); this.keys = options.keys; @@ -178,6 +185,13 @@ export class FairQueue { shardCount: this.shardCount, }); + this.tenantDispatch = new TenantDispatch({ + redis: options.redis, + keys: options.keys, + shardCount: this.shardCount, + }); + + if (options.concurrencyGroups && options.concurrencyGroups.length > 0) { this.concurrencyManager = new ConcurrencyManager({ redis: options.redis, @@ -230,6 +244,9 @@ export class FairQueue { getMasterQueueLength: async (shardId: number) => { return await this.masterQueue.getShardQueueCount(shardId); }, + getDispatchLength: async (shardId: number) => { + return await this.tenantDispatch.getShardTenantCount(shardId); + }, getInflightCount: async (shardId: number) => { return await this.visibilityManager.getInflightCount(shardId); }, @@ -256,8 +273,9 @@ export class FairQueue { const timestamp = options.timestamp ?? Date.now(); const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); - const shardId = this.masterQueue.getShardForQueue(options.queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); // Validate payload if schema provided and validation enabled if (this.validateOnEnqueue && this.payloadSchema) { @@ -297,22 +315,24 @@ export class FairQueue { metadata: options.metadata, }; - // Use atomic Lua script to enqueue and update master queue - await this.redis.enqueueMessageAtomic( + // Use atomic Lua script to enqueue and update tenant dispatch indexes + await this.redis.enqueueMessageAtomicV2( queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, options.queueId, messageId, timestamp.toString(), - JSON.stringify(storedMessage) + JSON.stringify(storedMessage), + options.tenantId ); span.setAttributes({ [FairQueueAttributes.QUEUE_ID]: options.queueId, [FairQueueAttributes.TENANT_ID]: options.tenantId, [FairQueueAttributes.MESSAGE_ID]: messageId, - [FairQueueAttributes.SHARD_ID]: shardId.toString(), + [FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(), }); this.telemetry.recordEnqueue(); @@ -343,8 +363,9 @@ export class FairQueue { async (span) => { const queueKey = this.keys.queueKey(options.queueId); const queueItemsKey = this.keys.queueItemsKey(options.queueId); - const shardId = this.masterQueue.getShardForQueue(options.queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(options.tenantId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(options.tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); const now = Date.now(); // Store queue descriptor @@ -397,12 +418,14 @@ export class FairQueue { args.push(messageId, timestamp.toString(), JSON.stringify(storedMessage)); } - // Use atomic Lua script for batch enqueue - await this.redis.enqueueBatchAtomic( + // Use atomic Lua script for batch enqueue with tenant dispatch indexes + await this.redis.enqueueBatchAtomicV2( queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, options.queueId, + options.tenantId, ...args ); @@ -410,7 +433,7 @@ export class FairQueue { [FairQueueAttributes.QUEUE_ID]: options.queueId, [FairQueueAttributes.TENANT_ID]: options.tenantId, [FairQueueAttributes.MESSAGE_COUNT]: messageIds.length, - [FairQueueAttributes.SHARD_ID]: shardId.toString(), + [FairQueueAttributes.SHARD_ID]: dispatchShardId.toString(), }); this.telemetry.recordEnqueueBatch(messageIds.length); @@ -672,6 +695,7 @@ export class FairQueue { await Promise.all([ this.masterQueue.close(), + this.tenantDispatch.close(), this.concurrencyManager?.close(), this.visibilityManager.close(), this.workerQueueManager.close(), @@ -693,10 +717,14 @@ export class FairQueue { } /** - * Get total queue count across all shards. + * Get total tenant count across dispatch shards plus any legacy queues still draining. */ async getTotalQueueCount(): Promise { - return await this.masterQueue.getTotalQueueCount(); + const [dispatchCount, legacyCount] = await Promise.all([ + this.tenantDispatch.getTotalTenantCount(), + this.masterQueue.getTotalQueueCount(), + ]); + return dispatchCount + legacyCount; } /** @@ -736,7 +764,7 @@ export class FairQueue { loopId, async (span) => { span.setAttribute("shard_id", shardId); - return await this.#processMasterQueueShard(loopId, shardId, span); + return await this.#processShardIteration(loopId, shardId, span); }, { iterationSpanName: "processMasterQueueShard", @@ -781,44 +809,198 @@ export class FairQueue { } } - async #processMasterQueueShard( + /** + * Process a shard iteration. Runs both the new tenant dispatch path + * and the legacy master queue drain path. + */ + async #processShardIteration( loopId: string, shardId: number, parentSpan?: Span ): Promise { - const masterQueueKey = this.keys.masterQueueKey(shardId); + let hadWork = false; + + // Main path: new two-level tenant dispatch (gets full DRR scheduling) + hadWork = await this.#processDispatchShard(loopId, shardId, parentSpan); - // Get total queues in this master queue shard for observability - const masterQueueSize = await this.masterQueue.getShardQueueCount(shardId); - parentSpan?.setAttribute("master_queue_size", masterQueueSize); - this.batchedSpanManager.incrementStat(loopId, "master_queue_size_sum", masterQueueSize); + // Drain path: legacy master queue (simple scheduling, no DRR) + // Check ZCARD first (O(1)) to skip the drain path when empty + const legacyCount = await this.masterQueue.getShardQueueCount(shardId); + if (legacyCount > 0) { + const drainHadWork = await this.#drainLegacyMasterQueueShard(loopId, shardId, parentSpan); + hadWork = hadWork || drainHadWork; + } - // Create scheduler context - const schedulerContext = this.#createSchedulerContext(); + return hadWork; + } - // Get queues to process from scheduler - const tenantQueues = await this.telemetry.trace( - "selectQueues", - async (span) => { - span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); - span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); - span.setAttribute("master_queue_size", masterQueueSize); - const result = await this.scheduler.selectQueues(masterQueueKey, loopId, schedulerContext); - span.setAttribute("tenant_count", result.length); - span.setAttribute( - "queue_count", - result.reduce((acc, t) => acc + t.queues.length, 0) - ); - return result; + /** + * Main path: process queues using the two-level tenant dispatch index. + * Level 1: dispatch index → tenantIds. Level 2: per-tenant → queueIds. + */ + async #processDispatchShard( + loopId: string, + shardId: number, + parentSpan?: Span + ): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + + // Get dispatch index size for observability + const dispatchSize = await this.tenantDispatch.getShardTenantCount(shardId); + parentSpan?.setAttribute("dispatch_size", dispatchSize); + this.batchedSpanManager.incrementStat(loopId, "dispatch_size_sum", dispatchSize); + + // Create dispatch-aware scheduler context + const schedulerContext: DispatchSchedulerContext = { + ...this.#createSchedulerContext(), + getQueuesForTenant: async (tenantId: string, limit?: number) => { + return this.tenantDispatch.getQueuesForTenant(tenantId, limit); }, - { kind: SpanKind.INTERNAL } - ); + }; + + // Get queues to process from scheduler + let tenantQueues: TenantQueues[]; + + if (this.scheduler.selectQueuesFromDispatch) { + // Use dispatch-aware scheduler method (DRR with two-level lookup) + tenantQueues = await this.telemetry.trace( + "selectQueuesFromDispatch", + async (span) => { + span.setAttribute(FairQueueAttributes.SHARD_ID, shardId.toString()); + span.setAttribute(FairQueueAttributes.CONSUMER_ID, loopId); + span.setAttribute("dispatch_size", dispatchSize); + const result = await this.scheduler.selectQueuesFromDispatch!( + dispatchKey, + loopId, + schedulerContext + ); + span.setAttribute("tenant_count", result.length); + span.setAttribute( + "queue_count", + result.reduce((acc, t) => acc + t.queues.length, 0) + ); + return result; + }, + { kind: SpanKind.INTERNAL } + ); + } else { + // Fallback: read dispatch index, build flat queue list, use legacy selectQueues + tenantQueues = await this.#fallbackDispatchToLegacyScheduler( + loopId, + shardId, + schedulerContext, + parentSpan + ); + } if (tenantQueues.length === 0) { this.batchedSpanManager.incrementStat(loopId, "empty_iterations"); return false; } + return this.#processSelectedQueues(loopId, shardId, tenantQueues); + } + + /** + * Drain path: process remaining messages from the legacy master queue shard. + * Uses simple ZRANGEBYSCORE without DRR - just flushing pre-deploy messages. + */ + async #drainLegacyMasterQueueShard( + loopId: string, + shardId: number, + parentSpan?: Span + ): Promise { + const masterQueueKey = this.keys.masterQueueKey(shardId); + const now = Date.now(); + + // Simple fetch from old master queue - no DRR needed for drain + const results = await this.redis.zrangebyscore( + masterQueueKey, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + 100 + ); + + if (results.length === 0) { + return false; + } + + // Parse results into QueueWithScore, group by tenant + const byTenant = new Map(); + for (let i = 0; i < results.length; i += 2) { + const queueId = results[i]; + const _score = results[i + 1]; + if (queueId && _score) { + const tenantId = this.keys.extractTenantId(queueId); + const existing = byTenant.get(tenantId) ?? []; + existing.push(queueId); + byTenant.set(tenantId, existing); + } + } + + // Build TenantQueues, filter at-capacity tenants + const tenantQueues: TenantQueues[] = []; + for (const [tenantId, queueIds] of byTenant) { + if (this.concurrencyManager) { + const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); + if (atCapacity) continue; + } + tenantQueues.push({ tenantId, queues: queueIds }); + } + + if (tenantQueues.length === 0) { + return false; + } + + parentSpan?.setAttribute("drain_tenants", tenantQueues.length); + this.batchedSpanManager.incrementStat(loopId, "drain_tenants", tenantQueues.length); + + return this.#processSelectedQueues(loopId, shardId, tenantQueues); + } + + /** + * Fallback for schedulers that don't implement selectQueuesFromDispatch. + * Reads dispatch index, fetches per-tenant queues, groups by tenant, + * and filters at-capacity tenants. No DRR deficit tracking in this path. + */ + async #fallbackDispatchToLegacyScheduler( + loopId: string, + shardId: number, + context: DispatchSchedulerContext, + parentSpan?: Span + ): Promise { + // Get tenants from dispatch + const tenants = await this.tenantDispatch.getTenantsFromShard(shardId); + if (tenants.length === 0) return []; + + // For each tenant, get their queues and build grouped result + const tenantQueues: TenantQueues[] = []; + for (const { tenantId } of tenants) { + if (this.concurrencyManager) { + const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); + if (atCapacity) continue; + } + const queues = await this.tenantDispatch.getQueuesForTenant(tenantId); + if (queues.length > 0) { + tenantQueues.push({ tenantId, queues: queues.map((q) => q.queueId) }); + } + } + + return tenantQueues; + } + + /** + * Shared claim loop: process selected queues from either dispatch or drain path. + * Claims messages and pushes to worker queues. + */ + async #processSelectedQueues( + loopId: string, + shardId: number, + tenantQueues: TenantQueues[] + ): Promise { // Track stats this.batchedSpanManager.incrementStat(loopId, "tenants_selected", tenantQueues.length); this.batchedSpanManager.incrementStat( @@ -829,7 +1011,6 @@ export class FairQueue { let messagesProcessed = 0; - // Process queues and push to worker queues for (const { tenantId, queues } of tenantQueues) { for (const queueId of queues) { // Check cooloff @@ -839,12 +1020,11 @@ export class FairQueue { } // Check tenant capacity before attempting to process - // If tenant is at capacity, skip ALL remaining queues for this tenant if (this.concurrencyManager) { const isAtCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId); if (isAtCapacity) { this.batchedSpanManager.incrementStat(loopId, "tenant_capacity_skipped"); - break; // Skip remaining queues for this tenant + break; } } @@ -865,8 +1045,6 @@ export class FairQueue { messagesProcessed += processedFromQueue; this.batchedSpanManager.incrementStat(loopId, "messages_claimed", processedFromQueue); - // Record processed messages for DRR deficit tracking - // Use batch variant if available for efficiency, otherwise fall back to single calls if (this.scheduler.recordProcessedBatch) { await this.telemetry.trace( "recordProcessedBatch", @@ -892,16 +1070,11 @@ export class FairQueue { } } } else { - // Don't increment cooloff here - the queue was either: - // 1. Empty (removed from master, cache cleaned up) - // 2. Concurrency blocked (message released back to queue) - // Neither case warrants cooloff as they're not failures this.batchedSpanManager.incrementStat(loopId, "claim_skipped"); } } } - // Return true if we processed any messages (had work) return messagesProcessed > 0; } @@ -909,11 +1082,13 @@ export class FairQueue { loopId: string, queueId: string, tenantId: string, - shardId: number + _consumerShardId: number ): Promise { + // Dispatch shard is tenant-based (tenantId hash), not queue-based. + // In-flight/master queue shard is queue-based. + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const descriptor = this.queueDescriptorCache.get(queueId) ?? { id: queueId, tenantId, @@ -953,12 +1128,10 @@ export class FairQueue { >(queueId, queueKey, queueItemsKey, loopId, maxClaimCount, this.visibilityTimeoutMs); if (claimedMessages.length === 0) { - // Queue is empty, update master queue and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { - this.queueDescriptorCache.delete(queueId); - this.queueCooloffStates.delete(queueId); - } + // Queue is empty, update both old and new indexes and clean up caches + await this.#updateAllIndexesAfterDequeue(queueId, tenantId); + this.queueDescriptorCache.delete(queueId); + this.queueCooloffStates.delete(queueId); return 0; } @@ -974,12 +1147,16 @@ export class FairQueue { if (!reserved) { // Release ALL remaining messages (from index i onward) back to queue // This prevents messages from being stranded in the in-flight set + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.releaseBatch( claimedMessages.slice(i), queueId, queueKey, queueItemsKey, - masterQueueKey + tenantQueueIndexKey, + dispatchKey, + tenantId ); // Stop processing more messages from this queue since we're at capacity break; @@ -1055,8 +1232,6 @@ export class FairQueue { */ async completeMessage(messageId: string, queueId: string): Promise { const shardId = this.masterQueue.getShardForQueue(queueId); - const queueKey = this.keys.queueKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message for concurrency release @@ -1076,7 +1251,7 @@ export class FairQueue { tenantId: storedMessage.tenantId, metadata: storedMessage.metadata ?? {}, } - : { id: queueId, tenantId: "", metadata: {} }; + : { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} }; // Complete in visibility manager await this.visibilityManager.complete(messageId, queueId); @@ -1086,9 +1261,12 @@ export class FairQueue { await this.concurrencyManager.release(descriptor, messageId); } - // Update master queue if queue is now empty, and clean up caches - const removed = await this.redis.updateMasterQueueIfEmpty(masterQueueKey, queueKey, queueId); - if (removed === 1) { + // Update both old and new indexes, clean up caches if queue is empty + const { queueEmpty } = await this.#updateAllIndexesAfterDequeue( + queueId, + descriptor.tenantId + ); + if (queueEmpty) { this.queueDescriptorCache.delete(queueId); this.queueCooloffStates.delete(queueId); } @@ -1112,7 +1290,6 @@ export class FairQueue { const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message for concurrency release @@ -1132,15 +1309,21 @@ export class FairQueue { tenantId: storedMessage.tenantId, metadata: storedMessage.metadata ?? {}, } - : { id: queueId, tenantId: "", metadata: {} }; + : { id: queueId, tenantId: this.keys.extractTenantId(queueId), metadata: {} }; - // Release back to queue + // Release back to queue (visibility manager updates dispatch indexes atomically) + // Dispatch shard is tenant-based, not queue-based + const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.release( messageId, queueId, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, + descriptor.tenantId, Date.now() // Put at back of queue ); @@ -1167,7 +1350,6 @@ export class FairQueue { const shardId = this.masterQueue.getShardForQueue(queueId); const queueKey = this.keys.queueKey(queueId); const queueItemsKey = this.keys.queueItemsKey(queueId); - const masterQueueKey = this.keys.masterQueueKey(shardId); const inflightDataKey = this.keys.inflightDataKey(shardId); // Get stored message @@ -1194,12 +1376,13 @@ export class FairQueue { metadata: storedMessage.metadata ?? {}, }; + const dispatchShardId = this.tenantDispatch.getShardForTenant(descriptor.tenantId); await this.#handleMessageFailure( storedMessage, queueId, queueKey, queueItemsKey, - masterQueueKey, + dispatchShardId, descriptor, error ); @@ -1214,7 +1397,7 @@ export class FairQueue { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + dispatchShardId: number, descriptor: QueueDescriptor, error?: Error ): Promise { @@ -1233,12 +1416,16 @@ export class FairQueue { // Release with delay, passing the updated message data so the Lua script // atomically writes the incremented attempt count when re-queuing. + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(descriptor.tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); await this.visibilityManager.release( storedMessage.id, queueId, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, + descriptor.tenantId, Date.now() + nextDelay, JSON.stringify(updatedMessage) ); @@ -1345,33 +1532,44 @@ export class FairQueue { let totalReclaimed = 0; for (let shardId = 0; shardId < this.shardCount; shardId++) { - const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({ - queueKey: this.keys.queueKey(queueId), - queueItemsKey: this.keys.queueItemsKey(queueId), - masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)), - })); - - // Release concurrency for all reclaimed messages in a single batch - // This is critical: when a message times out, its concurrency slot must be freed - // so the message can be processed again when it's re-claimed from the queue - if (this.concurrencyManager && reclaimedMessages.length > 0) { - try { - await this.concurrencyManager.releaseBatch( - reclaimedMessages.map((msg) => ({ - queue: { - id: msg.queueId, - tenantId: msg.tenantId, - metadata: msg.metadata ?? {}, - }, - messageId: msg.messageId, - })) - ); - } catch (error) { - this.logger.error("Failed to release concurrency for reclaimed messages", { - count: reclaimedMessages.length, - error: error instanceof Error ? error.message : String(error), - }); + const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => { + const tenantId = this.keys.extractTenantId(queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); + return { + queueKey: this.keys.queueKey(queueId), + queueItemsKey: this.keys.queueItemsKey(queueId), + tenantQueueIndexKey: this.keys.tenantQueueIndexKey(tenantId), + dispatchKey: this.keys.dispatchKey(dispatchShardId), + tenantId, + }; + }); + + if (reclaimedMessages.length > 0) { + // Release concurrency for all reclaimed messages in a single batch + // This is critical: when a message times out, its concurrency slot must be freed + // so the message can be processed again when it's re-claimed from the queue + if (this.concurrencyManager) { + try { + await this.concurrencyManager.releaseBatch( + reclaimedMessages.map((msg) => ({ + queue: { + id: msg.queueId, + tenantId: msg.tenantId, + metadata: msg.metadata ?? {}, + }, + messageId: msg.messageId, + })) + ); + } catch (error) { + this.logger.error("Failed to release concurrency for reclaimed messages", { + count: reclaimedMessages.length, + error: error instanceof Error ? error.message : String(error), + }); + } } + + // Dispatch indexes are updated atomically by the releaseMessage Lua script + // inside reclaimTimedOut, so no separate index update needed here. } totalReclaimed += reclaimedMessages.length; @@ -1445,6 +1643,41 @@ export class FairQueue { // Private - Helpers // ============================================================================ + /** + * Update both old master queue and new dispatch indexes after a dequeue/complete. + * Both calls are idempotent - ZREM on a non-existent member is a no-op. + * This handles the transition period where queues may exist in either or both indexes. + */ + async #updateAllIndexesAfterDequeue( + queueId: string, + tenantId: string + ): Promise<{ queueEmpty: boolean }> { + const queueShardId = this.masterQueue.getShardForQueue(queueId); + const dispatchShardId = this.tenantDispatch.getShardForTenant(tenantId); + const queueKey = this.keys.queueKey(queueId); + const masterQueueKey = this.keys.masterQueueKey(queueShardId); + const tenantQueueIndexKey = this.keys.tenantQueueIndexKey(tenantId); + const dispatchKey = this.keys.dispatchKey(dispatchShardId); + + // Update legacy master queue (drain path, no-op if queue not there) + const removedFromMaster = await this.redis.updateMasterQueueIfEmpty( + masterQueueKey, + queueKey, + queueId + ); + + // Update new dispatch indexes + const removedFromDispatch = await this.redis.updateDispatchIndexes( + queueKey, + tenantQueueIndexKey, + dispatchKey, + queueId, + tenantId + ); + + return { queueEmpty: removedFromMaster === 1 || removedFromDispatch === 1 }; + } + #createSchedulerContext(): SchedulerContext { return { getCurrentConcurrency: async (groupName, groupId) => { @@ -1476,7 +1709,9 @@ export class FairQueue { // ============================================================================ #registerCommands(): void { - // Atomic single message enqueue with master queue update + // ---- Legacy Lua scripts (kept for drain of old master queue) ---- + + // Atomic single message enqueue with master queue update (legacy, used for drain only) this.redis.defineCommand("enqueueMessageAtomic", { numberOfKeys: 3, lua: ` @@ -1489,13 +1724,9 @@ local messageId = ARGV[2] local timestamp = tonumber(ARGV[3]) local payload = ARGV[4] --- Add to sorted set (score = timestamp) redis.call('ZADD', queueKey, timestamp, messageId) - --- Store payload in hash redis.call('HSET', queueItemsKey, messageId, payload) --- Update master queue with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1505,7 +1736,7 @@ return 1 `, }); - // Atomic batch message enqueue with master queue update + // Atomic batch message enqueue with master queue update (legacy, used for drain only) this.redis.defineCommand("enqueueBatchAtomic", { numberOfKeys: 3, lua: ` @@ -1515,20 +1746,14 @@ local masterQueueKey = KEYS[3] local queueId = ARGV[1] --- Args after queueId are triples: [messageId, timestamp, payload, ...] for i = 2, #ARGV, 3 do local messageId = ARGV[i] local timestamp = tonumber(ARGV[i + 1]) local payload = ARGV[i + 2] - - -- Add to sorted set redis.call('ZADD', queueKey, timestamp, messageId) - - -- Store payload in hash redis.call('HSET', queueItemsKey, messageId, payload) end --- Update master queue with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1538,7 +1763,7 @@ return (#ARGV - 1) / 3 `, }); - // Update master queue if queue is empty + // Update master queue if queue is empty (legacy, used for drain) this.redis.defineCommand("updateMasterQueueIfEmpty", { numberOfKeys: 2, lua: ` @@ -1551,7 +1776,6 @@ if count == 0 then redis.call('ZREM', masterQueueKey, queueId) return 1 else - -- Update with oldest message timestamp local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then redis.call('ZADD', masterQueueKey, oldest[2], queueId) @@ -1561,6 +1785,124 @@ end `, }); + // ---- New V2 Lua scripts (two-level tenant dispatch) ---- + + // Atomic single message enqueue with tenant dispatch index update + this.redis.defineCommand("enqueueMessageAtomicV2", { + numberOfKeys: 4, + lua: ` +local queueKey = KEYS[1] +local queueItemsKey = KEYS[2] +local tenantQueueIndexKey = KEYS[3] +local dispatchKey = KEYS[4] + +local queueId = ARGV[1] +local messageId = ARGV[2] +local timestamp = tonumber(ARGV[3]) +local payload = ARGV[4] +local tenantId = ARGV[5] + +-- Add to per-queue storage (same as before) +redis.call('ZADD', queueKey, timestamp, messageId) +redis.call('HSET', queueItemsKey, messageId, payload) + +-- Update tenant queue index (Level 2) with queue's oldest message +local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) with tenant's oldest across all queues +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) +end + +return 1 + `, + }); + + // Atomic batch message enqueue with tenant dispatch index update + this.redis.defineCommand("enqueueBatchAtomicV2", { + numberOfKeys: 4, + lua: ` +local queueKey = KEYS[1] +local queueItemsKey = KEYS[2] +local tenantQueueIndexKey = KEYS[3] +local dispatchKey = KEYS[4] + +local queueId = ARGV[1] +local tenantId = ARGV[2] + +-- Args after queueId and tenantId are triples: [messageId, timestamp, payload, ...] +for i = 3, #ARGV, 3 do + local messageId = ARGV[i] + local timestamp = tonumber(ARGV[i + 1]) + local payload = ARGV[i + 2] + redis.call('ZADD', queueKey, timestamp, messageId) + redis.call('HSET', queueItemsKey, messageId, payload) +end + +-- Update tenant queue index (Level 2) +local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') +if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) +end + +return (#ARGV - 2) / 3 + `, + }); + + // Update tenant dispatch indexes after dequeue/complete + // Handles both queue-empty (remove from indexes) and queue-has-messages (update scores) + this.redis.defineCommand("updateDispatchIndexes", { + numberOfKeys: 3, + lua: ` +local queueKey = KEYS[1] +local tenantQueueIndexKey = KEYS[2] +local dispatchKey = KEYS[3] +local queueId = ARGV[1] +local tenantId = ARGV[2] + +local count = redis.call('ZCARD', queueKey) +if count == 0 then + -- Queue is empty: remove from tenant queue index + redis.call('ZREM', tenantQueueIndexKey, queueId) + + -- Check if tenant has any queues left + local tenantQueueCount = redis.call('ZCARD', tenantQueueIndexKey) + if tenantQueueCount == 0 then + -- No more queues: remove tenant from dispatch + redis.call('ZREM', dispatchKey, tenantId) + else + -- Update dispatch score to tenant's new oldest + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) + end + end + return 1 +else + -- Queue still has messages: update scores + local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') + if #oldest >= 2 then + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) + end + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) + end + return 0 +end + `, + }); + // Register worker queue commands if enabled if (this.workerQueueManager) { this.workerQueueManager.registerCommands(this.redis); @@ -1571,6 +1913,7 @@ end // Extend Redis interface for custom commands declare module "@internal/redis" { interface RedisCommander { + // Legacy commands (kept for drain of old master queue) enqueueMessageAtomic( queueKey: string, queueItemsKey: string, @@ -1594,5 +1937,36 @@ declare module "@internal/redis" { queueKey: string, queueId: string ): Promise; + + // V2 commands (two-level tenant dispatch) + enqueueMessageAtomicV2( + queueKey: string, + queueItemsKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + messageId: string, + timestamp: string, + payload: string, + tenantId: string + ): Promise; + + enqueueBatchAtomicV2( + queueKey: string, + queueItemsKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + tenantId: string, + ...args: string[] + ): Promise; + + updateDispatchIndexes( + queueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + queueId: string, + tenantId: string + ): Promise; } } diff --git a/packages/redis-worker/src/fair-queue/keyProducer.ts b/packages/redis-worker/src/fair-queue/keyProducer.ts index f63cdbed03e..1e1b2b42631 100644 --- a/packages/redis-worker/src/fair-queue/keyProducer.ts +++ b/packages/redis-worker/src/fair-queue/keyProducer.ts @@ -5,7 +5,9 @@ import type { FairQueueKeyProducer } from "./types.js"; * Uses a configurable prefix and standard key structure. * * Key structure: - * - Master queue: {prefix}:master:{shardId} + * - Master queue: {prefix}:master:{shardId} (legacy, drain-only) + * - Dispatch index: {prefix}:dispatch:{shardId} (Level 1: tenantIds) + * - Tenant queue index: {prefix}:tenantq:{tenantId} (Level 2: queueIds) * - Queue: {prefix}:queue:{queueId} * - Queue items: {prefix}:queue:{queueId}:items * - Concurrency: {prefix}:concurrency:{groupName}:{groupId} @@ -70,6 +72,18 @@ export class DefaultFairQueueKeyProducer implements FairQueueKeyProducer { return this.#buildKey("worker", consumerId); } + // ============================================================================ + // Tenant Dispatch Keys (Two-Level Index) + // ============================================================================ + + dispatchKey(shardId: number): string { + return this.#buildKey("dispatch", shardId.toString()); + } + + tenantQueueIndexKey(tenantId: string): string { + return this.#buildKey("tenantq", tenantId); + } + // ============================================================================ // Dead Letter Queue Keys // ============================================================================ diff --git a/packages/redis-worker/src/fair-queue/schedulers/drr.ts b/packages/redis-worker/src/fair-queue/schedulers/drr.ts index d06da05891f..3e05ae8a34f 100644 --- a/packages/redis-worker/src/fair-queue/schedulers/drr.ts +++ b/packages/redis-worker/src/fair-queue/schedulers/drr.ts @@ -2,6 +2,7 @@ import { createRedisClient, type Redis, type RedisOptions } from "@internal/redi import { BaseScheduler } from "../scheduler.js"; import type { DRRSchedulerConfig, + DispatchSchedulerContext, FairQueueKeyProducer, SchedulerContext, TenantQueues, @@ -132,6 +133,70 @@ export class DRRScheduler extends BaseScheduler { })); } + /** + * Select queues using the two-level tenant dispatch index. + * + * Algorithm: + * 1. ZRANGEBYSCORE on dispatch index (gets only tenants with queues - much smaller) + * 2. Add quantum to each tenant's deficit (atomically) + * 3. Check capacity as safety net (dispatch should only have tenants with capacity) + * 4. Select tenants with deficit >= 1, sorted by deficit (highest first) + * 5. For each tenant, fetch their queues from Level 2 index + */ + async selectQueuesFromDispatch( + dispatchShardKey: string, + consumerId: string, + context: DispatchSchedulerContext + ): Promise { + // Level 1: Get tenants from dispatch index + const tenants = await this.#getTenantsFromDispatch(dispatchShardKey); + + if (tenants.length === 0) { + return []; + } + + const tenantIds = tenants.map((t) => t.tenantId); + + // Add quantum to all active tenants atomically (1 Lua call) + const deficits = await this.#addQuantumToTenants(tenantIds); + + // Build candidates sorted by deficit (highest first) + const candidates = tenantIds + .map((tenantId, index) => ({ tenantId, deficit: deficits[index] ?? 0 })) + .filter((t) => t.deficit >= 1); + + candidates.sort((a, b) => b.deficit - a.deficit); + + // Pick the first tenant with available capacity and fetch their queues. + // This keeps the scheduler cheap: O(1) in the common case where the + // highest-deficit tenant has capacity. The consumer loop iterates fast + // (1ms yield between rounds) so we cycle through tenants quickly. + for (const { tenantId, deficit } of candidates) { + const isAtCapacity = await context.isAtCapacity("tenant", tenantId); + if (isAtCapacity) continue; + + // Limit queues fetched to what the tenant can actually process this round. + // deficit = max messages this tenant should process, so no point fetching + // more queues than that (each queue yields at least 1 message). + const queueLimit = Math.ceil(deficit); + const queues = await context.getQueuesForTenant(tenantId, queueLimit); + if (queues.length > 0) { + this.logger.debug("DRR dispatch: selected tenant", { + dispatchTenants: tenants.length, + candidates: candidates.length, + selectedTenant: tenantId, + deficit, + queueLimit, + queuesReturned: queues.length, + }); + + return [{ tenantId, queues: queues.map((q) => q.queueId) }]; + } + } + + return []; + } + /** * Record that a message was processed from a tenant. * Decrements the tenant's deficit. @@ -200,6 +265,35 @@ export class DRRScheduler extends BaseScheduler { return `${this.keys.masterQueueKey(0).split(":")[0]}:drr:deficit`; } + async #getTenantsFromDispatch( + dispatchKey: string + ): Promise> { + const now = Date.now(); + const results = await this.redis.zrangebyscore( + dispatchKey, + "-inf", + now, + "WITHSCORES", + "LIMIT", + 0, + this.masterQueueLimit + ); + + const tenants: Array<{ tenantId: string; score: number }> = []; + for (let i = 0; i < results.length; i += 2) { + const tenantId = results[i]; + const scoreStr = results[i + 1]; + if (tenantId && scoreStr) { + tenants.push({ + tenantId, + score: parseFloat(scoreStr), + }); + } + } + + return tenants; + } + async #getQueuesFromShard(shardKey: string): Promise { const now = Date.now(); const results = await this.redis.zrangebyscore( diff --git a/packages/redis-worker/src/fair-queue/telemetry.ts b/packages/redis-worker/src/fair-queue/telemetry.ts index 0dbdcd87113..e9531fc812a 100644 --- a/packages/redis-worker/src/fair-queue/telemetry.ts +++ b/packages/redis-worker/src/fair-queue/telemetry.ts @@ -56,6 +56,7 @@ export interface FairQueueMetrics { // Observable gauges (registered with callbacks) queueLength: ObservableGauge; masterQueueLength: ObservableGauge; + dispatchLength: ObservableGauge; inflightCount: ObservableGauge; dlqLength: ObservableGauge; } @@ -250,6 +251,7 @@ export class FairQueueTelemetry { registerGaugeCallbacks(callbacks: { getQueueLength?: (queueId: string) => Promise; getMasterQueueLength?: (shardId: number) => Promise; + getDispatchLength?: (shardId: number) => Promise; getInflightCount?: (shardId: number) => Promise; getDLQLength?: (tenantId: string) => Promise; shardCount?: number; @@ -273,7 +275,7 @@ export class FairQueueTelemetry { }); } - // Master queue length gauge + // Legacy master queue length gauge (draining, should trend to 0) if (callbacks.getMasterQueueLength && callbacks.shardCount) { const getMasterQueueLength = callbacks.getMasterQueueLength; const shardCount = callbacks.shardCount; @@ -288,6 +290,21 @@ export class FairQueueTelemetry { }); } + // Dispatch index length gauge (new two-level dispatch, tenant count per shard) + if (callbacks.getDispatchLength && callbacks.shardCount) { + const getDispatchLength = callbacks.getDispatchLength; + const shardCount = callbacks.shardCount; + + this.metrics.dispatchLength.addCallback(async (observableResult) => { + for (let shardId = 0; shardId < shardCount; shardId++) { + const length = await getDispatchLength(shardId); + observableResult.observe(length, { + [FairQueueAttributes.SHARD_ID]: shardId.toString(), + }); + } + }); + } + // Inflight count gauge if (callbacks.getInflightCount && callbacks.shardCount) { const getInflightCount = callbacks.getInflightCount; @@ -317,6 +334,7 @@ export class FairQueueTelemetry { } }); } + } // ============================================================================ @@ -414,9 +432,13 @@ export class FairQueueTelemetry { unit: "messages", }), masterQueueLength: this.meter.createObservableGauge(`${this.name}.master_queue.length`, { - description: "Number of queues in master queue shard", + description: "Number of queues in legacy master queue shard (draining)", unit: "queues", }), + dispatchLength: this.meter.createObservableGauge(`${this.name}.dispatch.length`, { + description: "Number of tenants in dispatch index shard", + unit: "tenants", + }), inflightCount: this.meter.createObservableGauge(`${this.name}.inflight.count`, { description: "Number of messages currently being processed", unit: "messages", diff --git a/packages/redis-worker/src/fair-queue/tenantDispatch.ts b/packages/redis-worker/src/fair-queue/tenantDispatch.ts new file mode 100644 index 00000000000..82663646f5a --- /dev/null +++ b/packages/redis-worker/src/fair-queue/tenantDispatch.ts @@ -0,0 +1,183 @@ +import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; +import { jumpHash } from "@trigger.dev/core/v3/serverOnly"; +import type { FairQueueKeyProducer, QueueWithScore } from "./types.js"; + +export interface TenantDispatchOptions { + redis: RedisOptions; + keys: FairQueueKeyProducer; + shardCount: number; +} + +export interface TenantWithScore { + tenantId: string; + score: number; +} + +/** + * TenantDispatch manages the two-level tenant dispatch index. + * + * Level 1 - Dispatch Index (per shard): + * Key: {prefix}:dispatch:{shardId} + * ZSET of tenantIds scored by oldest message timestamp across their queues. + * Only tenants with queues containing messages appear here. + * + * Level 2 - Per-Tenant Queue Index: + * Key: {prefix}:tenantq:{tenantId} + * ZSET of queueIds scored by oldest message timestamp in that queue. + * + * This replaces the flat master queue for new enqueues, isolating each tenant's + * queue backlog so the scheduler iterates tenants (not queues) at Level 1. + */ +export class TenantDispatch { + private redis: Redis; + private keys: FairQueueKeyProducer; + private shardCount: number; + + constructor(private options: TenantDispatchOptions) { + this.redis = createRedisClient(options.redis); + this.keys = options.keys; + this.shardCount = Math.max(1, options.shardCount); + } + + /** + * Get the dispatch shard ID for a tenant. + * Uses jump consistent hash on the tenant ID so each tenant + * always maps to exactly one dispatch shard. + */ + getShardForTenant(tenantId: string): number { + return jumpHash(tenantId, this.shardCount); + } + + /** + * Get eligible tenants from a dispatch shard (Level 1). + * Returns tenants ordered by oldest message (lowest score first). + */ + async getTenantsFromShard( + shardId: number, + limit: number = 1000, + maxScore?: number + ): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + const score = maxScore ?? Date.now(); + + const results = await this.redis.zrangebyscore( + dispatchKey, + "-inf", + score, + "WITHSCORES", + "LIMIT", + 0, + limit + ); + + const tenants: TenantWithScore[] = []; + for (let i = 0; i < results.length; i += 2) { + const tenantId = results[i]; + const scoreStr = results[i + 1]; + if (tenantId && scoreStr) { + tenants.push({ + tenantId, + score: parseFloat(scoreStr), + }); + } + } + + return tenants; + } + + /** + * Get queues for a specific tenant (Level 2). + * Returns queues ordered by oldest message (lowest score first). + */ + async getQueuesForTenant( + tenantId: string, + limit: number = 1000, + maxScore?: number + ): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + const score = maxScore ?? Date.now(); + + const results = await this.redis.zrangebyscore( + tenantQueueKey, + "-inf", + score, + "WITHSCORES", + "LIMIT", + 0, + limit + ); + + const queues: QueueWithScore[] = []; + for (let i = 0; i < results.length; i += 2) { + const queueId = results[i]; + const scoreStr = results[i + 1]; + if (queueId && scoreStr) { + queues.push({ + queueId, + score: parseFloat(scoreStr), + tenantId, + }); + } + } + + return queues; + } + + /** + * Get the number of tenants in a dispatch shard. + */ + async getShardTenantCount(shardId: number): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + return await this.redis.zcard(dispatchKey); + } + + /** + * Get total tenant count across all dispatch shards. + * Note: tenants may appear in multiple shards, so this may overcount. + */ + async getTotalTenantCount(): Promise { + const counts = await Promise.all( + Array.from({ length: this.shardCount }, (_, i) => this.getShardTenantCount(i)) + ); + return counts.reduce((sum, count) => sum + count, 0); + } + + /** + * Get the number of queues for a tenant. + */ + async getTenantQueueCount(tenantId: string): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + return await this.redis.zcard(tenantQueueKey); + } + + /** + * Remove a tenant from a specific dispatch shard. + */ + async removeTenantFromShard(shardId: number, tenantId: string): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.zrem(dispatchKey, tenantId); + } + + /** + * Add a tenant to a dispatch shard with the given score. + */ + async addTenantToShard(shardId: number, tenantId: string, score: number): Promise { + const dispatchKey = this.keys.dispatchKey(shardId); + await this.redis.zadd(dispatchKey, score, tenantId); + } + + /** + * Remove a queue from a tenant's queue index. + */ + async removeQueueFromTenant(tenantId: string, queueId: string): Promise { + const tenantQueueKey = this.keys.tenantQueueIndexKey(tenantId); + await this.redis.zrem(tenantQueueKey, queueId); + } + + /** + * Close the Redis connection. + */ + async close(): Promise { + await this.redis.quit(); + } +} diff --git a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts index 1222bd9e4f1..4d85603b984 100644 --- a/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts @@ -627,7 +627,9 @@ describe("Race Condition Tests", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({ queueKey: keys.queueKey(queueId), queueItemsKey: keys.queueItemsKey(queueId), - masterQueueKey: keys.masterQueueKey(0), + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(queueId)), + dispatchKey: keys.dispatchKey(0), + tenantId: keys.extractTenantId(queueId), })); reclaimResults.push(reclaimedMessages.length); } diff --git a/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts new file mode 100644 index 00000000000..feb1c93a0d1 --- /dev/null +++ b/packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts @@ -0,0 +1,1087 @@ +import { describe, expect } from "vitest"; +import { redisTest } from "@internal/testcontainers"; +import { createRedisClient } from "@internal/redis"; +import { z } from "zod"; +import { + FairQueue, + DefaultFairQueueKeyProducer, + DRRScheduler, + ExponentialBackoffRetry, + WorkerQueueManager, +} from "../index.js"; +import type { FairQueueKeyProducer, StoredMessage } from "../types.js"; +import type { RedisOptions } from "@internal/redis"; + +const TestPayloadSchema = z.object({ value: z.string() }); +type TestPayload = z.infer; +const TEST_WORKER_QUEUE_ID = "test-worker-queue"; + +/** + * Minimal test helper for tenant dispatch tests. + */ +class TestHelper { + public fairQueue: FairQueue; + private workerQueueManager: WorkerQueueManager; + private isRunning = false; + private abortController = new AbortController(); + private consumerLoops: Promise[] = []; + private messageHandler?: (ctx: { + message: { id: string; queueId: string; payload: TestPayload; attempt: number }; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise; + + constructor( + private redisOptions: RedisOptions, + private keys: FairQueueKeyProducer, + options: { + shardCount?: number; + consumerIntervalMs?: number; + concurrencyLimit?: number; + visibilityTimeoutMs?: number; + reclaimIntervalMs?: number; + retry?: { maxAttempts: number; delayMs: number }; + } = {} + ) { + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + this.fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: options.shardCount ?? 1, + consumerIntervalMs: options.consumerIntervalMs ?? 20, + visibilityTimeoutMs: options.visibilityTimeoutMs, + reclaimIntervalMs: options.reclaimIntervalMs, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + retry: options.retry + ? { + strategy: new ExponentialBackoffRetry({ + maxAttempts: options.retry.maxAttempts, + minTimeoutInMs: options.retry.delayMs, + maxTimeoutInMs: options.retry.delayMs, + factor: 1, + randomize: false, + }), + } + : undefined, + concurrencyGroups: options.concurrencyLimit + ? [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => options.concurrencyLimit!, + defaultLimit: options.concurrencyLimit!, + }, + ] + : undefined, + }); + + this.workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + } + + onMessage( + handler: (ctx: { + message: { id: string; queueId: string; payload: TestPayload; attempt: number }; + complete: () => Promise; + release: () => Promise; + fail: (error?: Error) => Promise; + }) => Promise + ): void { + this.messageHandler = handler; + } + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + this.abortController = new AbortController(); + this.fairQueue.start(); + this.consumerLoops.push(this.#runConsumerLoop()); + } + + async stop(): Promise { + if (!this.isRunning) return; + this.isRunning = false; + this.abortController.abort(); + await this.fairQueue.stop(); + await Promise.allSettled(this.consumerLoops); + this.consumerLoops = []; + } + + async close(): Promise { + await this.stop(); + await this.fairQueue.close(); + await this.workerQueueManager.close(); + } + + async #runConsumerLoop(): Promise { + try { + while (this.isRunning) { + if (!this.messageHandler) { + await new Promise((resolve) => setTimeout(resolve, 50)); + continue; + } + try { + const messageKey = await this.workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + this.abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await this.fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + await this.messageHandler({ + message: { + id: storedMessage.id, + queueId: storedMessage.queueId, + payload: storedMessage.payload, + attempt: storedMessage.attempt, + }, + complete: () => this.fairQueue.completeMessage(messageId, queueId), + release: () => this.fairQueue.releaseMessage(messageId, queueId), + fail: (error?: Error) => this.fairQueue.failMessage(messageId, queueId, error), + }); + } catch { + if (!this.isRunning) break; + } + } + } catch { + // Consumer loop stopped + } + } +} + +describe("Two-Level Tenant Dispatch", () => { + describe("enqueue writes to new index only", () => { + redisTest( + "should populate dispatch and tenant queue indexes, not old master queue", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue messages to two different queues for two tenants + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "msg1" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:q1", + tenantId: "t2", + payload: { value: "msg2" }, + }); + + // Check new dispatch index (Level 1): should have both tenants + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1, "WITHSCORES"); + expect(dispatchMembers.length).toBeGreaterThanOrEqual(2); // at least 1 tenant per shard + + // Check tenant queue indexes (Level 2) + const t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues).toContain("tenant:t1:queue:q1"); + + const t2Queues = await redis.zrange(keys.tenantQueueIndexKey("t2"), 0, -1); + expect(t2Queues).toContain("tenant:t2:queue:q1"); + + // Check old master queue: should be EMPTY (new enqueues don't write there) + const masterMembers = await redis.zrange(keys.masterQueueKey(0), 0, -1); + expect(masterMembers.length).toBe(0); + + // Per-queue storage should still work as before + const queueLength = await helper.fairQueue.getQueueLength("tenant:t1:queue:q1"); + expect(queueLength).toBe(1); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should populate dispatch index correctly for batch enqueue", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + await helper.fairQueue.enqueueBatch({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + messages: [ + { payload: { value: "msg1" } }, + { payload: { value: "msg2" } }, + { payload: { value: "msg3" } }, + ], + }); + + // Tenant queue index should have the queue + const t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues).toContain("tenant:t1:queue:q1"); + + // Dispatch should have the tenant + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t1"); + + // Per-queue storage should have all 3 messages + const queueLength = await helper.fairQueue.getQueueLength("tenant:t1:queue:q1"); + expect(queueLength).toBe(3); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("dispatch consumer processes messages", () => { + redisTest( + "should process messages via tenant dispatch path", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: string[] = []; + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue messages + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "first" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:q1", + tenantId: "t2", + payload: { value: "second" }, + }); + + // Set up consumer + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + + helper.start(); + + // Wait for messages to be processed + await waitFor(() => processed.length === 2, 5000); + expect(processed).toContain("first"); + expect(processed).toContain("second"); + + await helper.close(); + } + ); + }); + + describe("complete updates dispatch indexes", () => { + redisTest( + "should remove empty queue from tenant index and tenant from dispatch", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + + const helper = new TestHelper(redisOptions, keys); + + // Enqueue one message + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "only" }, + }); + + // Verify indexes populated + let t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues.length).toBe(1); + let dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t1"); + + // Process and complete the message + helper.onMessage(async (ctx) => { + await ctx.complete(); + }); + helper.start(); + + // Wait for processing + await waitFor( + async () => (await helper.fairQueue.getQueueLength("tenant:t1:queue:q1")) === 0, + 5000 + ); + + // Allow index updates to propagate + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Verify indexes cleaned up + t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + expect(t1Queues.length).toBe(0); + dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).not.toContain("t1"); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should keep tenant in dispatch when other queues remain", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + const helper = new TestHelper(redisOptions, keys, { concurrencyLimit: 1 }); + + // Enqueue to two queues for the same tenant + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "queue1" }, + }); + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q2", + tenantId: "t1", + payload: { value: "queue2" }, + }); + + // Process messages one at a time (concurrency limit 1) + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + helper.start(); + + // Wait for first message + await waitFor(() => processed.length >= 1, 5000); + + // Tenant should still be in dispatch (has remaining queue) + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + // After first complete, tenant may still be in dispatch due to second queue + // (exact timing depends on consumer loop) + + // Wait for both messages + await waitFor(() => processed.length === 2, 5000); + expect(processed).toContain("queue1"); + expect(processed).toContain("queue2"); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("legacy drain", () => { + redisTest( + "should drain pre-populated master queue alongside new dispatch", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + // Simulate pre-deploy state: write directly to old master queue + queue storage + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-msg-1", + queueId, + tenantId: "t1", + payload: { value: "legacy" }, + timestamp, + attempt: 1, + }; + + // Write to per-queue storage and old master queue (simulating pre-deploy) + await redis.zadd(queueKey, timestamp, "legacy-msg-1"); + await redis.hset(queueItemsKey, "legacy-msg-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Now create FairQueue (post-deploy) + const helper = new TestHelper(redisOptions, keys); + + // Also enqueue a new message (goes to dispatch only) + await helper.fairQueue.enqueue({ + queueId: "tenant:t2:queue:new", + tenantId: "t2", + payload: { value: "new" }, + }); + + // Verify: old message in master queue, new message in dispatch + const masterMembers = await redis.zrange(masterQueueKey, 0, -1); + expect(masterMembers).toContain(queueId); + const dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); + expect(dispatchMembers).toContain("t2"); + + // Process both messages + helper.onMessage(async (ctx) => { + processed.push(ctx.message.payload.value); + await ctx.complete(); + }); + helper.start(); + + // Both messages should be processed (legacy from drain + new from dispatch) + await waitFor(() => processed.length === 2, 10000); + expect(processed).toContain("legacy"); + expect(processed).toContain("new"); + + // Old master queue should be empty after drain + const masterAfter = await redis.zcard(masterQueueKey); + expect(masterAfter).toBe(0); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("DRR selectQueuesFromDispatch", () => { + redisTest( + "should select tenants from dispatch with DRR fairness", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; value: string }> = []; + + const helper = new TestHelper(redisOptions, keys, { concurrencyLimit: 100 }); + + // Enqueue messages for multiple tenants + for (let i = 0; i < 5; i++) { + await helper.fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + for (let i = 0; i < 5; i++) { + await helper.fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + // Process all messages + helper.onMessage(async (ctx) => { + const tenantId = ctx.message.queueId.split(":")[1]!; + processed.push({ tenantId, value: ctx.message.payload.value }); + await ctx.complete(); + }); + helper.start(); + + await waitFor(() => processed.length === 10, 10000); + + // Both tenants should have been processed + const t1Count = processed.filter((p) => p.tenantId === "t1").length; + const t2Count = processed.filter((p) => p.tenantId === "t2").length; + expect(t1Count).toBe(5); + expect(t2Count).toBe(5); + + await helper.close(); + } + ); + }); + + describe("DRR fairness across iterations", () => { + redisTest( + "should distribute processing fairly and not starve any tenant", + { timeout: 30000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; order: number }> = []; + let processOrder = 0; + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 5, + maxDeficit: 20, + }); + + const fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerIntervalMs: 10, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async () => 100, // High limit so concurrency doesn't interfere + defaultLimit: 100, + }, + ], + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Scenario: 3 tenants with very different queue counts + // t1: 50 queues (heavy hitter) + // t2: 5 queues (medium) + // t3: 1 queue with 5 messages (small) + // All should get fair service - no tenant should be starved. + + for (let i = 0; i < 50; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + + for (let i = 0; i < 5; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + for (let i = 0; i < 5; i++) { + await fairQueue.enqueue({ + queueId: "tenant:t3:queue:q0", + tenantId: "t3", + payload: { value: `t3-${i}` }, + }); + } + + // Start processing + fairQueue.start(); + const abortController = new AbortController(); + + const consumerLoop = (async () => { + while (!abortController.signal.aborted) { + try { + const messageKey = await workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + processed.push({ tenantId: storedMessage.tenantId, order: processOrder++ }); + await fairQueue.completeMessage(messageId, queueId); + } catch { + if (abortController.signal.aborted) break; + } + } + })(); + + // Wait for all 60 messages to be processed + await waitFor(() => processed.length === 60, 20000); + + // === Fairness assertions === + + const t1 = processed.filter((p) => p.tenantId === "t1"); + const t2 = processed.filter((p) => p.tenantId === "t2"); + const t3 = processed.filter((p) => p.tenantId === "t3"); + + // All messages processed + expect(t1.length).toBe(50); + expect(t2.length).toBe(5); + expect(t3.length).toBe(5); + + // No tenant should be starved: every tenant should have at least one + // message processed in the first 20 messages. With quantum=5 and 3 tenants, + // each should get a turn within the first few iterations. + const first20 = processed.slice(0, 20); + const tenantsInFirst20 = new Set(first20.map((p) => p.tenantId)); + expect(tenantsInFirst20.size).toBe(3); + + // t2 and t3 should finish well before t1 (they have fewer messages). + // Check that t2's last message is processed before t1's last message. + const t2LastOrder = Math.max(...t2.map((p) => p.order)); + const t1LastOrder = Math.max(...t1.map((p) => p.order)); + expect(t2LastOrder).toBeLessThan(t1LastOrder); + + // t3 should also finish before t1 + const t3LastOrder = Math.max(...t3.map((p) => p.order)); + expect(t3LastOrder).toBeLessThan(t1LastOrder); + + // Check that t1 doesn't monopolize early processing. + // In the first 15 messages, t1 should have at most 10 (with quantum=5, + // t1 gets ~5 per round, and there are 3 tenants taking turns). + const t1InFirst15 = processed.slice(0, 15).filter((p) => p.tenantId === "t1").length; + expect(t1InFirst15).toBeLessThanOrEqual(10); + + // Clean up + abortController.abort(); + await Promise.allSettled([consumerLoop]); + await fairQueue.close(); + await workerQueueManager.close(); + } + ); + }); + + describe("noisy neighbor isolation", () => { + redisTest( + "should not block other tenants when one tenant is at capacity", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const processed: Array<{ tenantId: string; value: string }> = []; + let blockT1 = true; + + const scheduler = new DRRScheduler({ + redis: redisOptions, + keys, + quantum: 10, + maxDeficit: 100, + }); + + const fairQueue = new FairQueue({ + redis: redisOptions, + keys, + scheduler, + payloadSchema: TestPayloadSchema, + shardCount: 1, + consumerIntervalMs: 20, + startConsumers: false, + workerQueue: { resolveWorkerQueue: () => TEST_WORKER_QUEUE_ID }, + concurrencyGroups: [ + { + name: "tenant", + extractGroupId: (q) => q.tenantId, + getLimit: async (tenantId) => { + // t1 has very low concurrency, t2 has high + return tenantId === "t1" ? 1 : 100; + }, + defaultLimit: 10, + }, + ], + }); + + const workerQueueManager = new WorkerQueueManager({ + redis: redisOptions, + keys, + }); + + // Enqueue 20 messages for t1 (noisy neighbor with concurrency 1) + for (let i = 0; i < 20; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t1:queue:q${i}`, + tenantId: "t1", + payload: { value: `t1-${i}` }, + }); + } + + // Enqueue 3 messages for t2 (quiet tenant with high concurrency) + for (let i = 0; i < 3; i++) { + await fairQueue.enqueue({ + queueId: `tenant:t2:queue:q${i}`, + tenantId: "t2", + payload: { value: `t2-${i}` }, + }); + } + + // Start processing + fairQueue.start(); + const abortController = new AbortController(); + + const consumerLoop = (async () => { + while (!abortController.signal.aborted) { + try { + const messageKey = await workerQueueManager.blockingPop( + TEST_WORKER_QUEUE_ID, + 1, + abortController.signal + ); + if (!messageKey) continue; + + const colonIndex = messageKey.indexOf(":"); + if (colonIndex === -1) continue; + + const messageId = messageKey.substring(0, colonIndex); + const queueId = messageKey.substring(colonIndex + 1); + const storedMessage = await fairQueue.getMessageData(messageId, queueId); + if (!storedMessage) continue; + + const tenantId = storedMessage.tenantId; + processed.push({ tenantId, value: storedMessage.payload.value }); + await fairQueue.completeMessage(messageId, queueId); + } catch { + if (abortController.signal.aborted) break; + } + } + })(); + + // Wait for t2's messages to be processed (they shouldn't be blocked by t1) + await waitFor( + () => processed.filter((p) => p.tenantId === "t2").length === 3, + 10000 + ); + + const t2ProcessedCount = processed.filter((p) => p.tenantId === "t2").length; + expect(t2ProcessedCount).toBe(3); + + // Clean up + abortController.abort(); + await Promise.allSettled([consumerLoop]); + await fairQueue.close(); + await workerQueueManager.close(); + } + ); + }); + describe("release updates dispatch indexes", () => { + redisTest( + "should update dispatch indexes when message is released for retry", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const attempts: number[] = []; + + const helper = new TestHelper(redisOptions, keys, { + retry: { maxAttempts: 3, delayMs: 100 }, + }); + + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "retry-me" }, + }); + + helper.onMessage(async (ctx) => { + attempts.push(ctx.message.attempt); + if (ctx.message.attempt < 2) { + // Fail on first attempt to trigger retry + await ctx.fail(new Error("transient error")); + } else { + await ctx.complete(); + } + }); + helper.start(); + + // Wait for successful processing on second attempt + await waitFor(() => attempts.length >= 2 && attempts.includes(2), 10000); + + // After retry, the message went back into the queue via releaseMessage Lua. + // Verify it was picked up again (attempt 2 processed). + expect(attempts).toContain(1); + expect(attempts).toContain(2); + + // After completion, dispatch indexes should be cleaned up + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("reclaim updates dispatch indexes", () => { + redisTest( + "should update dispatch indexes when timed-out message is reclaimed", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processCount = { count: 0 }; + + const helper = new TestHelper(redisOptions, keys, { + visibilityTimeoutMs: 500, + reclaimIntervalMs: 200, + }); + + await helper.fairQueue.enqueue({ + queueId: "tenant:t1:queue:q1", + tenantId: "t1", + payload: { value: "reclaim-me" }, + }); + + helper.onMessage(async (ctx) => { + processCount.count++; + if (processCount.count === 1) { + // First attempt: don't complete, let it timeout and get reclaimed + await new Promise((resolve) => setTimeout(resolve, 1500)); + } else { + // Second attempt after reclaim: complete normally + await ctx.complete(); + } + }); + helper.start(); + + // Wait for message to be processed twice (once timeout, once success) + await waitFor(() => processCount.count >= 2, 10000); + + // After reclaim + re-processing + completion, indexes should be clean + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + await helper.close(); + await redis.quit(); + } + ); + }); + + describe("legacy message migration via reclaim", () => { + redisTest( + "should migrate legacy master queue message to dispatch index on reclaim", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const processed: string[] = []; + + // Simulate pre-deploy: write message to old master queue + queue storage + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-reclaim-1", + queueId, + tenantId: "t1", + payload: { value: "legacy-reclaim" }, + timestamp, + attempt: 1, + }; + + await redis.zadd(queueKey, timestamp, "legacy-reclaim-1"); + await redis.hset(queueItemsKey, "legacy-reclaim-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Verify: message only in old master queue, not in dispatch + expect(await redis.zcard(keys.dispatchKey(0))).toBe(0); + expect(await redis.zcard(keys.tenantQueueIndexKey("t1"))).toBe(0); + + // Create FairQueue with short visibility timeout + const helper = new TestHelper(redisOptions, keys, { + visibilityTimeoutMs: 500, + reclaimIntervalMs: 200, + }); + + const processCount = { count: 0 }; + helper.onMessage(async (ctx) => { + processCount.count++; + if (processCount.count === 1) { + // First attempt: don't complete, let it timeout + // The reclaim will put it back in queue and update dispatch indexes + await new Promise((resolve) => setTimeout(resolve, 1500)); + } else { + // Second attempt: complete + processed.push(ctx.message.payload.value); + await ctx.complete(); + } + }); + helper.start(); + + // Wait for the message to be processed (first via drain, then reclaimed into dispatch) + await waitFor(() => processed.length === 1, 15000); + expect(processed[0]).toBe("legacy-reclaim"); + + // After completion, both old and new indexes should be clean + const masterAfter = await redis.zcard(masterQueueKey); + const dispatchAfter = await redis.zcard(keys.dispatchKey(0)); + const tenantQueuesAfter = await redis.zcard(keys.tenantQueueIndexKey("t1")); + + // Old master queue should still be empty (drain removed it) + // or at least the queue itself should be gone + expect(tenantQueuesAfter).toBe(0); + + await helper.close(); + await redis.quit(); + } + ); + + redisTest( + "should migrate legacy message to dispatch index on retry failure", + { timeout: 20000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const attempts: number[] = []; + + // Simulate pre-deploy: write message to old master queue + const queueId = "tenant:t1:queue:legacy"; + const queueKey = keys.queueKey(queueId); + const queueItemsKey = keys.queueItemsKey(queueId); + const masterQueueKey = keys.masterQueueKey(0); + + const timestamp = Date.now(); + const storedMessage: StoredMessage = { + id: "legacy-retry-1", + queueId, + tenantId: "t1", + payload: { value: "legacy-retry" }, + timestamp, + attempt: 1, + }; + + await redis.zadd(queueKey, timestamp, "legacy-retry-1"); + await redis.hset(queueItemsKey, "legacy-retry-1", JSON.stringify(storedMessage)); + await redis.zadd(masterQueueKey, timestamp, queueId); + + // Create FairQueue with retry enabled + const helper = new TestHelper(redisOptions, keys, { + retry: { maxAttempts: 3, delayMs: 100 }, + }); + + helper.onMessage(async (ctx) => { + attempts.push(ctx.message.attempt); + if (ctx.message.attempt < 2) { + // Fail first attempt — triggers retry which writes to dispatch index + await ctx.fail(new Error("transient")); + } else { + await ctx.complete(); + } + }); + helper.start(); + + // Wait for retry to complete + await waitFor(() => attempts.includes(2), 10000); + + // The retry release should have written to dispatch indexes. + // After completion, indexes should be clean. + await waitFor(async () => { + const t1Queues = await redis.zcard(keys.tenantQueueIndexKey("t1")); + return t1Queues === 0; + }, 5000); + + expect(attempts).toContain(1); + expect(attempts).toContain(2); + + await helper.close(); + await redis.quit(); + } + ); + }); + describe("dispatch shard is tenant-based, not queue-based", () => { + redisTest( + "tenant with queues in different queue shards should appear in only one dispatch shard", + { timeout: 15000 }, + async ({ redisOptions }) => { + const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); + const redis = createRedisClient(redisOptions); + const shardCount = 2; + + const helper = new TestHelper(redisOptions, keys, { shardCount }); + + try { + const tenantId = "tenant-shard-test"; + + // Find two queue IDs for the same tenant that hash to different queue shards + // by trying different queue names + const { MasterQueue: MQ } = await import("../masterQueue.js"); + const mq = new MQ({ redis: redisOptions, keys, shardCount }); + const { TenantDispatch: TD } = await import("../tenantDispatch.js"); + const td = new TD({ redis: redisOptions, keys, shardCount }); + + let queueShard0: string | null = null; + let queueShard1: string | null = null; + + for (let i = 0; i < 100; i++) { + const qId = `tenant:${tenantId}:queue:q${i}`; + const shard = mq.getShardForQueue(qId); + if (shard === 0 && !queueShard0) queueShard0 = qId; + if (shard === 1 && !queueShard1) queueShard1 = qId; + if (queueShard0 && queueShard1) break; + } + + expect(queueShard0).not.toBeNull(); + expect(queueShard1).not.toBeNull(); + + // Both queues belong to the same tenant, so dispatch shard should be the same + const expectedDispatchShard = td.getShardForTenant(tenantId); + + // Enqueue to both queues + await helper.fairQueue.enqueue({ + queueId: queueShard0!, + tenantId, + payload: { value: "msg-shard0" }, + }); + await helper.fairQueue.enqueue({ + queueId: queueShard1!, + tenantId, + payload: { value: "msg-shard1" }, + }); + + // Verify: tenant should only appear in one dispatch shard + const dispatch0 = await redis.zrange(keys.dispatchKey(0), 0, -1); + const dispatch1 = await redis.zrange(keys.dispatchKey(1), 0, -1); + + const inShard0 = dispatch0.includes(tenantId); + const inShard1 = dispatch1.includes(tenantId); + + // Tenant should appear in exactly one shard + expect(inShard0 !== inShard1).toBe(true); + + // And it should be the expected one + if (expectedDispatchShard === 0) { + expect(inShard0).toBe(true); + expect(inShard1).toBe(false); + } else { + expect(inShard0).toBe(false); + expect(inShard1).toBe(true); + } + + await mq.close(); + await td.close(); + } finally { + await helper.close(); + await redis.quit(); + } + } + ); + }); +}); + +// Helper to wait for a condition +async function waitFor( + condition: () => boolean | Promise, + timeoutMs: number = 5000, + intervalMs: number = 50 +): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const result = await condition(); + if (result) return; + await new Promise((resolve) => setTimeout(resolve, intervalMs)); + } + throw new Error(`waitFor timed out after ${timeoutMs}ms`); +} diff --git a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts index a5685d51c9e..2ecdeb41e90 100644 --- a/packages/redis-worker/src/fair-queue/tests/visibility.test.ts +++ b/packages/redis-worker/src/fair-queue/tests/visibility.test.ts @@ -465,7 +465,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:release-batch"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add messages to queue and claim them for (let i = 1; i <= 5; i++) { @@ -501,7 +502,9 @@ describe("VisibilityManager", () => { queueId, queueKey, queueItemsKey, - masterQueueKey + tenantQueueIndexKey, + dispatchKey, + "t1" ); // Verify 2 messages still in-flight @@ -539,17 +542,18 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:empty-release"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Should not throw when releasing empty array - await manager.releaseBatch([], queueId, queueKey, queueItemsKey, masterQueueKey); + await manager.releaseBatch([], queueId, queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, "t1"); await manager.close(); } ); redisTest( - "should update master queue with oldest message timestamp", + "should update dispatch indexes with oldest message timestamp", { timeout: 10000 }, async ({ redisOptions }) => { keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); @@ -562,10 +566,11 @@ describe("VisibilityManager", () => { }); const redis = createRedisClient(redisOptions); - const queueId = "tenant:t1:queue:master-update"; + const queueId = "tenant:t1:queue:dispatch-update"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim messages const baseTime = Date.now(); @@ -586,11 +591,15 @@ describe("VisibilityManager", () => { const claimed = await manager.claimBatch(queueId, queueKey, queueItemsKey, "consumer-1", 3); // Release all messages back - await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, masterQueueKey); + await manager.releaseBatch(claimed, queueId, queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, "t1"); - // Master queue should have been updated - const masterScore = await redis.zscore(masterQueueKey, queueId); - expect(masterScore).not.toBeNull(); + // Tenant queue index should have the queue with correct score + const tenantQueueScore = await redis.zscore(tenantQueueIndexKey, queueId); + expect(tenantQueueScore).not.toBeNull(); + + // Dispatch index should have the tenant + const dispatchScore = await redis.zscore(dispatchKey, "t1"); + expect(dispatchScore).not.toBeNull(); await manager.close(); await redis.quit(); @@ -616,7 +625,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:reclaim-test"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim a message const messageId = "reclaim-msg"; @@ -644,7 +654,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(1); @@ -690,7 +702,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:no-timeout"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim a message with long timeout const messageId = "long-timeout-msg"; @@ -712,7 +725,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(0); @@ -736,7 +751,8 @@ describe("VisibilityManager", () => { }); const redis = createRedisClient(redisOptions); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); // Add and claim messages for two different tenants for (const tenant of ["t1", "t2"]) { @@ -767,7 +783,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(2); @@ -798,7 +816,8 @@ describe("VisibilityManager", () => { const queueId = "tenant:t1:queue:fallback-test"; const queueKey = keys.queueKey(queueId); const queueItemsKey = keys.queueItemsKey(queueId); - const masterQueueKey = keys.masterQueueKey(0); + const tenantQueueIndexKey = keys.tenantQueueIndexKey("t1"); + const dispatchKey = keys.dispatchKey(0); const inflightDataKey = keys.inflightDataKey(0); // Add and claim a message @@ -830,7 +849,9 @@ describe("VisibilityManager", () => { const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({ queueKey: keys.queueKey(qId), queueItemsKey: keys.queueItemsKey(qId), - masterQueueKey, + tenantQueueIndexKey: keys.tenantQueueIndexKey(keys.extractTenantId(qId)), + dispatchKey, + tenantId: keys.extractTenantId(qId), })); expect(reclaimedMessages).toHaveLength(1); diff --git a/packages/redis-worker/src/fair-queue/types.ts b/packages/redis-worker/src/fair-queue/types.ts index 6451df1bea0..d10cad1d0d4 100644 --- a/packages/redis-worker/src/fair-queue/types.ts +++ b/packages/redis-worker/src/fair-queue/types.ts @@ -180,6 +180,14 @@ export interface SchedulerContext { getQueueDescriptor(queueId: string): QueueDescriptor; } +/** + * Extended context for two-level dispatch scheduling. + */ +export interface DispatchSchedulerContext extends SchedulerContext { + /** Get queues for a specific tenant from the per-tenant queue index (Level 2) */ + getQueuesForTenant(tenantId: string, limit?: number): Promise; +} + /** * Pluggable scheduler interface for fair queue selection. */ @@ -199,6 +207,18 @@ export interface FairScheduler { context: SchedulerContext ): Promise; + /** + * Select queues using the two-level tenant dispatch index. + * Level 1: reads tenantIds from dispatch shard. + * Level 2: reads queueIds from per-tenant index. + * Optional - falls back to selectQueues with flat queue list if not implemented. + */ + selectQueuesFromDispatch?( + dispatchShardKey: string, + consumerId: string, + context: DispatchSchedulerContext + ): Promise; + /** * Called after processing a message to update scheduler state. * Optional - not all schedulers need to track state. @@ -292,6 +312,12 @@ export interface FairQueueKeyProducer { /** Get the dead letter queue data hash key for a tenant */ deadLetterQueueDataKey(tenantId: string): string; + // Tenant dispatch keys (two-level index) + /** Get the dispatch index key for a shard (Level 1: tenantIds with capacity) */ + dispatchKey(shardId: number): string; + /** Get the per-tenant queue index key (Level 2: queueIds for a tenant) */ + tenantQueueIndexKey(tenantId: string): string; + // Extraction methods /** Extract tenant ID from a queue ID */ extractTenantId(queueId: string): string; diff --git a/packages/redis-worker/src/fair-queue/visibility.ts b/packages/redis-worker/src/fair-queue/visibility.ts index 80fbf2ef004..a182a4e790d 100644 --- a/packages/redis-worker/src/fair-queue/visibility.ts +++ b/packages/redis-worker/src/fair-queue/visibility.ts @@ -275,7 +275,9 @@ export class VisibilityManager { * @param queueId - The queue ID * @param queueKey - The Redis key for the queue * @param queueItemsKey - The Redis key for the queue items hash - * @param masterQueueKey - The Redis key for the master queue + * @param tenantQueueIndexKey - The Redis key for the tenant queue index (Level 2) + * @param dispatchKey - The Redis key for the dispatch index (Level 1) + * @param tenantId - The tenant ID * @param score - Optional score for the message (defaults to now) */ async release( @@ -283,7 +285,9 @@ export class VisibilityManager { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + tenantId: string, score?: number, updatedData?: string ): Promise { @@ -297,18 +301,20 @@ export class VisibilityManager { // 1. Get message data from in-flight (or use updatedData if provided) // 2. Remove from in-flight // 3. Add back to queue - // 4. Update master queue to ensure queue is picked up + // 4. Update dispatch indexes to ensure queue is picked up await this.redis.releaseMessage( inflightKey, inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, member, messageId, messageScore.toString(), queueId, - updatedData ?? "" + updatedData ?? "", + tenantId ); this.logger.debug("Message released", { @@ -327,7 +333,9 @@ export class VisibilityManager { * @param queueId - The queue ID * @param queueKey - The Redis key for the queue * @param queueItemsKey - The Redis key for the queue items hash - * @param masterQueueKey - The Redis key for the master queue + * @param tenantQueueIndexKey - The Redis key for the tenant queue index (Level 2) + * @param dispatchKey - The Redis key for the dispatch index (Level 1) + * @param tenantId - The tenant ID * @param score - Optional score for the messages (defaults to now) */ async releaseBatch( @@ -335,7 +343,9 @@ export class VisibilityManager { queueId: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, + tenantId: string, score?: number ): Promise { if (messages.length === 0) { @@ -356,9 +366,11 @@ export class VisibilityManager { inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, messageScore.toString(), queueId, + tenantId, ...members, ...messageIds ); @@ -383,7 +395,9 @@ export class VisibilityManager { getQueueKeys: (queueId: string) => { queueKey: string; queueItemsKey: string; - masterQueueKey: string; + tenantQueueIndexKey: string; + dispatchKey: string; + tenantId: string; } ): Promise { const inflightKey = this.keys.inflightKey(shardId); @@ -410,7 +424,8 @@ export class VisibilityManager { continue; } const { messageId, queueId } = this.#parseMember(member); - const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId); + const { queueKey, queueItemsKey, tenantQueueIndexKey, dispatchKey, tenantId } = + getQueueKeys(queueId); try { // Get message data BEFORE releasing so we can extract tenantId for concurrency release @@ -432,12 +447,14 @@ export class VisibilityManager { inflightDataKey, queueKey, queueItemsKey, - masterQueueKey, + tenantQueueIndexKey, + dispatchKey, member, messageId, score.toString(), queueId, - "" + "", + tenantId ); // Track reclaimed message for concurrency release @@ -669,21 +686,23 @@ return results `, }); - // Atomic release: remove from in-flight, add back to queue, update master queue + // Atomic release: remove from in-flight, add back to queue, update dispatch indexes this.redis.defineCommand("releaseMessage", { - numberOfKeys: 5, + numberOfKeys: 6, lua: ` local inflightKey = KEYS[1] local inflightDataKey = KEYS[2] local queueKey = KEYS[3] local queueItemsKey = KEYS[4] -local masterQueueKey = KEYS[5] +local tenantQueueIndexKey = KEYS[5] +local dispatchKey = KEYS[6] local member = ARGV[1] local messageId = ARGV[2] local score = tonumber(ARGV[3]) local queueId = ARGV[4] local updatedData = ARGV[5] +local tenantId = ARGV[6] -- Get message data from in-flight local payload = redis.call('HGET', inflightDataKey, messageId) @@ -706,12 +725,16 @@ redis.call('HDEL', inflightDataKey, messageId) redis.call('ZADD', queueKey, score, messageId) redis.call('HSET', queueItemsKey, messageId, payload) --- Update master queue with oldest message timestamp --- This ensures delayed messages don't push the queue priority to the future --- when there are other ready messages in the queue +-- Update tenant queue index (Level 2) with queue's oldest message local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then - redis.call('ZADD', masterQueueKey, oldest[2], queueId) + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) +end + +-- Update dispatch index (Level 1) with tenant's oldest across all queues +local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') +if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) end return 1 @@ -720,21 +743,23 @@ return 1 // Atomic batch release: release multiple messages back to queue this.redis.defineCommand("releaseMessageBatch", { - numberOfKeys: 5, + numberOfKeys: 6, lua: ` local inflightKey = KEYS[1] local inflightDataKey = KEYS[2] local queueKey = KEYS[3] local queueItemsKey = KEYS[4] -local masterQueueKey = KEYS[5] +local tenantQueueIndexKey = KEYS[5] +local dispatchKey = KEYS[6] local score = tonumber(ARGV[1]) local queueId = ARGV[2] +local tenantId = ARGV[3] -- Remaining args are: members..., messageIds... -- Calculate how many messages we have -local numMessages = (table.getn(ARGV) - 2) / 2 -local membersStart = 3 +local numMessages = (table.getn(ARGV) - 3) / 2 +local membersStart = 4 local messageIdsStart = membersStart + numMessages local releasedCount = 0 @@ -742,27 +767,33 @@ local releasedCount = 0 for i = 0, numMessages - 1 do local member = ARGV[membersStart + i] local messageId = ARGV[messageIdsStart + i] - + -- Get message data from in-flight local payload = redis.call('HGET', inflightDataKey, messageId) if payload then -- Remove from in-flight redis.call('ZREM', inflightKey, member) redis.call('HDEL', inflightDataKey, messageId) - + -- Add back to queue redis.call('ZADD', queueKey, score, messageId) redis.call('HSET', queueItemsKey, messageId, payload) - + releasedCount = releasedCount + 1 end end --- Update master queue with oldest message timestamp (only once at the end) +-- Update dispatch indexes (only once at the end) if releasedCount > 0 then + -- Update tenant queue index (Level 2) local oldest = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES') if #oldest >= 2 then - redis.call('ZADD', masterQueueKey, oldest[2], queueId) + redis.call('ZADD', tenantQueueIndexKey, oldest[2], queueId) + end + -- Update dispatch index (Level 1) + local tenantOldest = redis.call('ZRANGE', tenantQueueIndexKey, 0, 0, 'WITHSCORES') + if #tenantOldest >= 2 then + redis.call('ZADD', dispatchKey, tenantOldest[2], tenantId) end end @@ -822,12 +853,14 @@ declare module "@internal/redis" { inflightDataKey: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, member: string, messageId: string, score: string, queueId: string, - updatedData: string + updatedData: string, + tenantId: string ): Promise; releaseMessageBatch( @@ -835,9 +868,11 @@ declare module "@internal/redis" { inflightDataKey: string, queueKey: string, queueItemsKey: string, - masterQueueKey: string, + tenantQueueIndexKey: string, + dispatchKey: string, score: string, queueId: string, + tenantId: string, ...membersAndMessageIds: string[] ): Promise; diff --git a/scripts/enhance-release-pr.mjs b/scripts/enhance-release-pr.mjs index af8f8333b0d..9621d2920f3 100644 --- a/scripts/enhance-release-pr.mjs +++ b/scripts/enhance-release-pr.mjs @@ -302,7 +302,11 @@ function formatPrBody({ version, packageEntries, serverEntries, rawBody }) { "These changes affect the self-hosted Docker image and Trigger.dev Cloud:" ); lines.push(""); - for (const entry of allServer) lines.push(`- ${entry.text}`); + for (const entry of allServer) { + // Indent continuation lines so multi-line entries stay inside the list item + const indented = entry.text.replace(/\n/g, "\n "); + lines.push(`- ${indented}`); + } lines.push(""); }