feat(batch-queue): two-level tenant dispatch for fair queue#3133
feat(batch-queue): two-level tenant dispatch for fair queue#3133
Conversation
Replace flat master queue index with two-level tenant dispatch to fix noisy neighbor problem. When a tenant has many queues at capacity, the scheduler now iterates tenants (Level 1) not queues, then fetches per-tenant queues (Level 2) only for eligible tenants. Single-deploy migration: new enqueues write to dispatch indexes only, consumer drains old master queue alongside new dispatch path until empty. refs TRI-7082
|
WalkthroughThis pull request introduces a two-level tenant dispatch system to the Redis fair-queue implementation. It adds a new TenantDispatch class that manages dispatch indices at the shard level (Level 1: tenants per shard) and tenant level (Level 2: queues per tenant), ordered by oldest message timestamps. The FairQueue is extended to support both the new dispatch-based queue selection path and a fallback mechanism for the legacy master queue drain. New atomic Lua scripts (V2 variants) update both tenant queue indices and dispatch indices during enqueue operations. The DRR scheduler gains a selectQueuesFromDispatch method for dispatch-based queue selection. Telemetry adds dispatch length metrics, and comprehensive tests cover enqueue behavior, dispatch processing, fairness across tenants, and legacy migration scenarios. Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Detailed notesKey implementation areas requiring careful review:
The heterogeneous nature—spanning index structures, scheduling logic, atomic operations, migration concerns, and telemetry—combined with the logic density in the main index.ts file warrants thorough review of dispatch flow correctness and state consistency across the dual-path execution model. 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (9)
packages/redis-worker/src/fair-queue/tenantDispatch.ts (2)
5-14: Usetypeinstead ofinterfaceper coding guidelines.Both
TenantDispatchOptionsandTenantWithScoreare plain data shapes with no need for declaration merging. As per coding guidelines: "Use types over interfaces for TypeScript".♻️ Proposed refactor
-export interface TenantDispatchOptions { - redis: RedisOptions; - keys: FairQueueKeyProducer; - shardCount: number; -} +export type TenantDispatchOptions = { + redis: RedisOptions; + keys: FairQueueKeyProducer; + shardCount: number; +}; -export interface TenantWithScore { - tenantId: string; - score: number; -} +export type TenantWithScore = { + tenantId: string; + score: number; +};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/tenantDispatch.ts` around lines 5 - 14, Replace the two interfaces with type aliases: change the declarations for TenantDispatchOptions and TenantWithScore from interface to type (preserving their property shapes: redis: RedisOptions; keys: FairQueueKeyProducer; shardCount: number for TenantDispatchOptions, and tenantId: string; score: number for TenantWithScore) to conform to the "use types over interfaces" guideline; ensure any imports/exports and usages of TenantDispatchOptions and TenantWithScore remain unchanged.
36-40:private optionsstored but never accessed after construction.The
privatemodifier on the constructor parameter creates athis.optionsfield that is never referenced —redis,keys, andshardCountare separately stored. Removeprivateto avoid holding the extra reference.♻️ Proposed refactor
- constructor(private options: TenantDispatchOptions) { + constructor(options: TenantDispatchOptions) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/tenantDispatch.ts` around lines 36 - 40, The constructor currently declares a private field via the parameter (private options: TenantDispatchOptions) but never uses this.options; remove the unnecessary instance field by changing the constructor parameter to plain options: TenantDispatchOptions and keep assigning this.redis = createRedisClient(options.redis), this.keys = options.keys and this.shardCount = Math.max(1, options.shardCount) in the TenantDispatch constructor so the class no longer holds an unused this.options reference.packages/redis-worker/src/fair-queue/index.ts (3)
1687-1714:queueEmptylogic is correct but semantically fragile.Line 1713 determines
queueEmptyasremovedFromMaster === 1 || removedFromDispatch === 1. Both Lua scripts independently checkZCARD(queueKey) == 0, so they agree — the||is safe. However, the return value ofupdateDispatchIndexesis overloaded:1means "queue was empty AND removed from index",0means "queue still has messages AND scores updated". Worth a brief inline comment for future readers.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/index.ts` around lines 1687 - 1714, In `#updateAllIndexesAfterDequeue` explain the semantic fragility around queueEmpty: add a brief inline comment above the return in the async method `#updateAllIndexesAfterDequeue` (referencing queueKey, masterQueueKey, tenantQueueIndexKey, dispatchKey, removedFromMaster, removedFromDispatch) that clarifies that both Redis/Lua helpers (updateMasterQueueIfEmpty and updateDispatchIndexes) independently check ZCARD(queueKey)==0 and return 1 when they removed the queue from their index and 0 when the queue still has messages (updateDispatchIndexes also returns 0 when it only updated scores), and therefore using "removedFromMaster === 1 || removedFromDispatch === 1" is intentional to infer queueEmpty; keep the logic unchanged but add that comment for future maintainers.
846-907: Extra ZCARD call in#processDispatchShardfor telemetry.Line 854 calls
getShardTenantCount(ZCARD) for the span attribute, but the scheduler'sselectQueuesFromDispatchwill also read the dispatch index. This is a minor extra Redis round-trip per iteration. Acceptable for observability, but if the dispatch index grows large or iteration frequency is high, you could pass the count back from the scheduler instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/index.ts` around lines 846 - 907, The extra ZCARD call comes from calling getShardTenantCount in `#processDispatchShard` for telemetry while scheduler.selectQueuesFromDispatch also reads the dispatch index; to remove the redundant Redis round-trip, extend the scheduler.selectQueuesFromDispatch contract to return both the TenantQueues[] and the tenant count (or include a tenantCount field in its result), update the DispatchSchedulerContext/signature used in selectQueuesFromDispatch to carry that count back, and then replace the getShardTenantCount call in `#processDispatchShard` with the count returned by selectQueuesFromDispatch (falling back to getShardTenantCount only if the scheduler result doesn't include the count). Ensure you update any callers/implementations of selectQueuesFromDispatch to populate the new count field and preserve telemetry attributes like "dispatch_size" by using the returned value.
723-733:getTotalQueueCountreturns semantically mixed counts during migration.The method sums dispatch tenant count + legacy queue count. The inline comment acknowledges this, but callers may not expect the inconsistency. Consider adding a doc comment on the public method noting this.
📝 Proposed doc update
/** * Get total queue count across all shards. + * Note: During migration from legacy master queue to two-level dispatch, + * this returns an approximate count (dispatch tenant count + legacy queue count). */ async getTotalQueueCount(): Promise<number> {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/index.ts` around lines 723 - 733, The public method getTotalQueueCount currently returns a mixed metric (tenant count from tenantDispatch via tenantDispatch.getTotalTenantCount() plus legacy queue count from masterQueue.getTotalQueueCount()) during migration; add a clear doc comment above getTotalQueueCount describing that it returns "dispatch tenant count + legacy draining queue count" (not a pure queue count), explain the migration caveat and expected semantics for callers, and mention when/what condition (migration phase) this behavior will change so callers know to handle the inconsistency.packages/redis-worker/src/fair-queue/types.ts (1)
183-189: New interfaceDispatchSchedulerContext— consider usingtypeper coding guidelines.The coding guidelines prefer
typeoverinterfacefor TypeScript. However, since this extendsSchedulerContextwhich is already aninterfacethroughout the codebase, keeping consistency here is reasonable. Flagging for awareness. As per coding guidelines: "Use types over interfaces for TypeScript".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/types.ts` around lines 183 - 189, Replace the exported interface DispatchSchedulerContext with a type alias to follow the "use types over interfaces" guideline: create an exported type DispatchSchedulerContext = SchedulerContext & { getQueuesForTenant(tenantId: string): Promise<QueueWithScore[]> }; keep the existing JSDoc comment and the getQueuesForTenant signature, and ensure any references to DispatchSchedulerContext remain valid (no other code changes needed unless something explicitly uses "implements" with the old interface).packages/redis-worker/src/fair-queue/schedulers/drr.ts (2)
193-206: Sequential Level 2 Redis fetches could be parallelized.The per-tenant queue fetch in the loop at lines 195-203 issues one Redis call per eligible tenant sequentially. Since these are independent reads,
Promise.allwould reduce total latency, consistent with how capacity checks are already parallelized at lines 163-179.♻️ Proposed refactor
- // Level 2: For each eligible tenant, fetch their queues - const result: TenantQueues[] = []; - for (const { tenantId } of eligibleTenants) { - const queues = await context.getQueuesForTenant(tenantId); - if (queues.length > 0) { - result.push({ - tenantId, - queues: queues.map((q) => q.queueId), - }); - } - } - - return result; + // Level 2: For each eligible tenant, fetch their queues (in parallel) + const tenantQueueResults = await Promise.all( + eligibleTenants.map(async ({ tenantId }) => { + const queues = await context.getQueuesForTenant(tenantId); + return { tenantId, queues }; + }) + ); + + return tenantQueueResults + .filter(({ queues }) => queues.length > 0) + .map(({ tenantId, queues }) => ({ + tenantId, + queues: queues.map((q) => q.queueId), + }));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/schedulers/drr.ts` around lines 193 - 206, The loop that builds result by calling context.getQueuesForTenant for each eligibleTenants entry should be parallelized: replace the sequential for-await pattern with Promise.all over eligibleTenants.map(tenant => context.getQueuesForTenant(tenant.tenantId)), then iterate the resolved array to push only non-empty queues into the TenantQueues result (preserving tenantId via the original eligibleTenants index or by mapping to {tenantId, queues} tuples). Update the code that currently references eligibleTenants and result so it uses the parallelized responses and still returns the same TenantQueues[] shape.
276-303: Duplicated ZRANGEBYSCORE parsing logic with#getQueuesFromShard.
#getTenantsFromDispatch(lines 276-303) and#getQueuesFromShard(lines 305-331) share nearly identical ZRANGEBYSCORE + WITHSCORES parsing patterns. Consider extracting a shared helper to reduce duplication.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/schedulers/drr.ts` around lines 276 - 303, `#getTenantsFromDispatch` and `#getQueuesFromShard` duplicate the zrangebyscore WITHSCORES parsing logic; extract a shared helper (e.g., parseZRangeWithScores or zrangeWithScoresAsObjects) that calls this.redis.zrangebyscore(dispatchKey, "-inf", now, "WITHSCORES", "LIMIT", 0, this.masterQueueLimit) (or accepts limit/score bounds) and returns an Array<{ id: string; score: number }>, then replace the bodies of `#getTenantsFromDispatch` and `#getQueuesFromShard` to call that helper and map id -> tenantId/queueId respectively (reuse this.masterQueueLimit and this.redis.zrangebyscore invocation within the helper).packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts (1)
309-315: Timing-dependent assertion uses a baresetTimeoutto wait for propagation.Line 315 uses
await new Promise((resolve) => setTimeout(resolve, 200))to wait for index updates to propagate. This is fragile — under CI load, 200ms may not be enough. Consider usingwaitForagainst the dispatch index state instead, which is already used elsewhere in this file.♻️ Proposed fix — use waitFor for dispatch cleanup
- // Allow index updates to propagate - await new Promise((resolve) => setTimeout(resolve, 200)); - - // Verify indexes cleaned up - t1Queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); - expect(t1Queues.length).toBe(0); - dispatchMembers = await redis.zrange(keys.dispatchKey(0), 0, -1); - expect(dispatchMembers).not.toContain("t1"); + // Wait for indexes to be cleaned up + await waitFor(async () => { + const queues = await redis.zrange(keys.tenantQueueIndexKey("t1"), 0, -1); + const members = await redis.zrange(keys.dispatchKey(0), 0, -1); + return queues.length === 0 && !members.includes("t1"); + }, 5000);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts` around lines 309 - 315, Replace the fragile fixed 200ms sleep with a polling assertion using the existing waitFor helper: after confirming queue length is 0, call waitFor to repeatedly check the dispatch index state via the test helper (use helper.fairQueue to query the dispatch index entries for tenant t1 / queue q1) until the dispatch index is empty (increase the timeout to something like 5s); this removes the bare setTimeout and ensures the test waits for dispatch cleanup deterministically.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/redis-worker/src/fair-queue/index.ts`:
- Around line 974-1001: The fallback method `#fallbackDispatchToLegacyScheduler`
builds allQueues from tenantDispatch.getQueuesForTenant but then ignores it by
calling scheduler.selectQueues with keys.masterQueueKey (which reads Redis); fix
by using the collected allQueues instead of the Redis master key — either (A)
call a scheduler method that accepts an in-memory queue list (e.g.,
scheduler.selectQueuesFromDispatch or a similar API) passing allQueues, loopId
and baseContext, or (B) populate a temporary sorted set keyed by a generated
tempMasterKey with the entries from allQueues, use that temp key when calling
scheduler.selectQueues, and delete the temp key afterwards; update
`#fallbackDispatchToLegacyScheduler` to perform one of these and ensure
tenantDispatch.getQueuesForTenant and allQueues are actually consumed.
In `@packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts`:
- Line 498: Remove the unused local variable blockT1 from the test; it is
declared as "let blockT1 = true;" in tenantDispatch.test.ts but never read, so
delete that declaration (or use it if intended for a future assertion) to clean
up the test code and eliminate the unused-variable warning.
---
Nitpick comments:
In `@packages/redis-worker/src/fair-queue/index.ts`:
- Around line 1687-1714: In `#updateAllIndexesAfterDequeue` explain the semantic
fragility around queueEmpty: add a brief inline comment above the return in the
async method `#updateAllIndexesAfterDequeue` (referencing queueKey,
masterQueueKey, tenantQueueIndexKey, dispatchKey, removedFromMaster,
removedFromDispatch) that clarifies that both Redis/Lua helpers
(updateMasterQueueIfEmpty and updateDispatchIndexes) independently check
ZCARD(queueKey)==0 and return 1 when they removed the queue from their index and
0 when the queue still has messages (updateDispatchIndexes also returns 0 when
it only updated scores), and therefore using "removedFromMaster === 1 ||
removedFromDispatch === 1" is intentional to infer queueEmpty; keep the logic
unchanged but add that comment for future maintainers.
- Around line 846-907: The extra ZCARD call comes from calling
getShardTenantCount in `#processDispatchShard` for telemetry while
scheduler.selectQueuesFromDispatch also reads the dispatch index; to remove the
redundant Redis round-trip, extend the scheduler.selectQueuesFromDispatch
contract to return both the TenantQueues[] and the tenant count (or include a
tenantCount field in its result), update the DispatchSchedulerContext/signature
used in selectQueuesFromDispatch to carry that count back, and then replace the
getShardTenantCount call in `#processDispatchShard` with the count returned by
selectQueuesFromDispatch (falling back to getShardTenantCount only if the
scheduler result doesn't include the count). Ensure you update any
callers/implementations of selectQueuesFromDispatch to populate the new count
field and preserve telemetry attributes like "dispatch_size" by using the
returned value.
- Around line 723-733: The public method getTotalQueueCount currently returns a
mixed metric (tenant count from tenantDispatch via
tenantDispatch.getTotalTenantCount() plus legacy queue count from
masterQueue.getTotalQueueCount()) during migration; add a clear doc comment
above getTotalQueueCount describing that it returns "dispatch tenant count +
legacy draining queue count" (not a pure queue count), explain the migration
caveat and expected semantics for callers, and mention when/what condition
(migration phase) this behavior will change so callers know to handle the
inconsistency.
In `@packages/redis-worker/src/fair-queue/schedulers/drr.ts`:
- Around line 193-206: The loop that builds result by calling
context.getQueuesForTenant for each eligibleTenants entry should be
parallelized: replace the sequential for-await pattern with Promise.all over
eligibleTenants.map(tenant => context.getQueuesForTenant(tenant.tenantId)), then
iterate the resolved array to push only non-empty queues into the TenantQueues
result (preserving tenantId via the original eligibleTenants index or by mapping
to {tenantId, queues} tuples). Update the code that currently references
eligibleTenants and result so it uses the parallelized responses and still
returns the same TenantQueues[] shape.
- Around line 276-303: `#getTenantsFromDispatch` and `#getQueuesFromShard` duplicate
the zrangebyscore WITHSCORES parsing logic; extract a shared helper (e.g.,
parseZRangeWithScores or zrangeWithScoresAsObjects) that calls
this.redis.zrangebyscore(dispatchKey, "-inf", now, "WITHSCORES", "LIMIT", 0,
this.masterQueueLimit) (or accepts limit/score bounds) and returns an Array<{
id: string; score: number }>, then replace the bodies of `#getTenantsFromDispatch`
and `#getQueuesFromShard` to call that helper and map id -> tenantId/queueId
respectively (reuse this.masterQueueLimit and this.redis.zrangebyscore
invocation within the helper).
In `@packages/redis-worker/src/fair-queue/tenantDispatch.ts`:
- Around line 5-14: Replace the two interfaces with type aliases: change the
declarations for TenantDispatchOptions and TenantWithScore from interface to
type (preserving their property shapes: redis: RedisOptions; keys:
FairQueueKeyProducer; shardCount: number for TenantDispatchOptions, and
tenantId: string; score: number for TenantWithScore) to conform to the "use
types over interfaces" guideline; ensure any imports/exports and usages of
TenantDispatchOptions and TenantWithScore remain unchanged.
- Around line 36-40: The constructor currently declares a private field via the
parameter (private options: TenantDispatchOptions) but never uses this.options;
remove the unnecessary instance field by changing the constructor parameter to
plain options: TenantDispatchOptions and keep assigning this.redis =
createRedisClient(options.redis), this.keys = options.keys and this.shardCount =
Math.max(1, options.shardCount) in the TenantDispatch constructor so the class
no longer holds an unused this.options reference.
In `@packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts`:
- Around line 309-315: Replace the fragile fixed 200ms sleep with a polling
assertion using the existing waitFor helper: after confirming queue length is 0,
call waitFor to repeatedly check the dispatch index state via the test helper
(use helper.fairQueue to query the dispatch index entries for tenant t1 / queue
q1) until the dispatch index is empty (increase the timeout to something like
5s); this removes the bare setTimeout and ensures the test waits for dispatch
cleanup deterministically.
In `@packages/redis-worker/src/fair-queue/types.ts`:
- Around line 183-189: Replace the exported interface DispatchSchedulerContext
with a type alias to follow the "use types over interfaces" guideline: create an
exported type DispatchSchedulerContext = SchedulerContext & {
getQueuesForTenant(tenantId: string): Promise<QueueWithScore[]> }; keep the
existing JSDoc comment and the getQueuesForTenant signature, and ensure any
references to DispatchSchedulerContext remain valid (no other code changes
needed unless something explicitly uses "implements" with the old interface).
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
packages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/tenantDispatch.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/types.ts
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (25)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (5, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (6, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (1, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (4, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (8, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (3, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (2, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (7, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (6, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (3, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (5, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (7, 8)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (8, 8)
- GitHub Check: units / webapp / 🧪 Unit Tests: Webapp (1, 8)
- GitHub Check: units / internal / 🧪 Unit Tests: Internal (4, 8)
- GitHub Check: units / packages / 🧪 Unit Tests: Packages (1, 1)
- GitHub Check: sdk-compat / Cloudflare Workers
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: sdk-compat / Node.js 20.20 (ubuntu-latest)
- GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
- GitHub Check: typecheck / typecheck
- GitHub Check: sdk-compat / Node.js 22.12 (ubuntu-latest)
- GitHub Check: sdk-compat / Deno Runtime
🧰 Additional context used
📓 Path-based instructions (10)
**/*.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.{ts,tsx}: Use types over interfaces for TypeScript
Avoid using enums; prefer string unions or const objects instead
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use function declarations instead of default exports
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/*.ts
📄 CodeRabbit inference engine (.cursor/rules/otel-metrics.mdc)
**/*.ts: When creating or editing OTEL metrics (counters, histograms, gauges), ensure metric attributes have low cardinality by using only enums, booleans, bounded error codes, or bounded shard IDs
Do not use high-cardinality attributes in OTEL metrics such as UUIDs/IDs (envId, userId, runId, projectId, organizationId), unbounded integers (itemCount, batchSize, retryCount), timestamps (createdAt, startTime), or free-form strings (errorMessage, taskName, queueName)
When exporting OTEL metrics via OTLP to Prometheus, be aware that the exporter automatically adds unit suffixes to metric names (e.g., 'my_duration_ms' becomes 'my_duration_ms_milliseconds', 'my_counter' becomes 'my_counter_total'). Account for these transformations when writing Grafana dashboards or Prometheus queries
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/*.{js,ts,jsx,tsx,json,md,yaml,yml}
📄 CodeRabbit inference engine (AGENTS.md)
Format code using Prettier before committing
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
{packages,integrations}/**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (CLAUDE.md)
When modifying public packages in
packages/*orintegrations/*, add a changeset usingpnpm run changeset:add
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/*.{ts,tsx,js}
📄 CodeRabbit inference engine (CLAUDE.md)
Import from
@trigger.dev/coreusing subpaths only, never the root
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/{src,app}/**/*.{ts,tsx}
📄 CodeRabbit inference engine (CLAUDE.md)
**/{src,app}/**/*.{ts,tsx}: Always import Trigger.dev tasks from@trigger.dev/sdk. Never use@trigger.dev/sdk/v3or deprecatedclient.defineJobpattern
Every Trigger.dev task must be exported and include a uniqueidstring property
Files:
packages/redis-worker/src/fair-queue/keyProducer.tspackages/redis-worker/src/fair-queue/types.tspackages/redis-worker/src/fair-queue/schedulers/drr.tspackages/redis-worker/src/fair-queue/telemetry.tspackages/redis-worker/src/fair-queue/index.tspackages/redis-worker/src/fair-queue/tests/tenantDispatch.test.tspackages/redis-worker/src/fair-queue/tenantDispatch.ts
**/*.{test,spec}.{ts,tsx}
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
Use vitest for all tests in the Trigger.dev repository
Files:
packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts
**/*.test.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (AGENTS.md)
**/*.test.{ts,tsx,js,jsx}: Test files should live beside the files under test and use descriptivedescribeanditblocks
Tests should avoid mocks or stubs and use the helpers from@internal/testcontainerswhen Redis or Postgres are needed
Use vitest for running unit tests
Files:
packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts
**/*.test.{ts,tsx,js}
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.test.{ts,tsx,js}: Never mock anything in tests - use testcontainers instead for Redis and PostgreSQL
Test files should be placed next to source files (e.g.,MyService.ts→MyService.test.ts)
Use vitest exclusively for testing
Files:
packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts
🧠 Learnings (4)
📚 Learning: 2025-11-27T16:26:37.432Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2025-11-27T16:26:37.432Z
Learning: Applies to {packages/core,apps/webapp}/**/*.{ts,tsx} : Use zod for validation in packages/core and apps/webapp
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
📚 Learning: 2025-11-27T16:27:35.304Z
Learnt from: CR
Repo: triggerdotdev/trigger.dev PR: 0
File: .cursor/rules/writing-tasks.mdc:0-0
Timestamp: 2025-11-27T16:27:35.304Z
Learning: Applies to **/trigger/**/*.{ts,tsx,js,jsx} : Control concurrency using the `queue` property with `concurrencyLimit` option
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
📚 Learning: 2026-02-10T16:18:48.654Z
Learnt from: ericallam
Repo: triggerdotdev/trigger.dev PR: 2980
File: apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx:512-515
Timestamp: 2026-02-10T16:18:48.654Z
Learning: In apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx, environment.queueSizeLimit is a per-queue maximum that is configured at the environment level, not a shared limit across all queues. Each queue can have up to environment.queueSizeLimit items queued independently.
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
📚 Learning: 2026-01-12T17:18:09.451Z
Learnt from: matt-aitken
Repo: triggerdotdev/trigger.dev PR: 2870
File: apps/webapp/app/services/redisConcurrencyLimiter.server.ts:56-66
Timestamp: 2026-01-12T17:18:09.451Z
Learning: In `apps/webapp/app/services/redisConcurrencyLimiter.server.ts`, the query concurrency limiter will not be deployed with Redis Cluster mode, so multi-key operations (keyKey and globalKey in different hash slots) are acceptable and will function correctly in standalone Redis mode.
Applied to files:
packages/redis-worker/src/fair-queue/index.ts
🧬 Code graph analysis (1)
packages/redis-worker/src/fair-queue/telemetry.ts (1)
internal-packages/tracing/src/index.ts (1)
ObservableGauge(19-19)
🔇 Additional comments (10)
packages/redis-worker/src/fair-queue/keyProducer.ts (1)
75-85: LGTM! New dispatch key methods follow the established patterns of the class cleanly and consistently.packages/redis-worker/src/fair-queue/types.ts (1)
315-319: LGTM! The new key producer methods are well-documented and consistent with the existing interface surface.packages/redis-worker/src/fair-queue/telemetry.ts (1)
293-306: LGTM! The newdispatchLengthgauge follows the established pattern formasterQueueLengthexactly, with low-cardinality shard ID attributes.Also applies to: 438-441
packages/redis-worker/src/fair-queue/tenantDispatch.ts (1)
31-181: LGTM on the core design. The two-level index management is clean, well-documented, and the ZSET operations correctly maintain oldest-message-timestamp ordering.packages/redis-worker/src/fair-queue/index.ts (5)
817-840: Two-path shard iteration design is well-structured.Running dispatch first, then checking legacy via O(1) ZCARD before draining, ensures efficient handling of both paths during migration. The legacy path will naturally become a no-op as the master queue empties.
189-193: LGTM! TenantDispatch wiring is consistent with MasterQueue initialization pattern.
319-330: LGTM on V2 enqueue wiring. The atomic V2 call correctly passes all 4 keys and 5 args matching the Lua script's expectations.
1559-1608: Dispatch index updates on reclaim are correctly deduplicated per tenant:queue pair.The
updatedQueuesset at line 1584 prevents redundantupdateDispatchIndexescalls when multiple messages are reclaimed from the same queue. Error handling is per-queue so one failure won't block others. Good design.
1826-1858: No action needed — Redis Cluster cross-slot concern does not apply to this deployment.Redis-worker is deployed with standalone Redis (not Cluster mode), so multi-key operations across different hash slots are fully supported. The Lua script's 4 keys (
queueKey,queueItemsKey,tenantQueueIndexKey,dispatchKey) are not constrained by slot availability in standalone mode.Likely an incorrect or invalid review comment.
packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts (1)
155-155: No action needed.redisTestfrom@internal/testcontainerscorrectly wraps vitest'stestfunction with Redis context fixtures, so the missingitimport is intentional and follows the project's testing patterns per the coding guidelines.
| async #fallbackDispatchToLegacyScheduler( | ||
| loopId: string, | ||
| shardId: number, | ||
| context: DispatchSchedulerContext, | ||
| parentSpan?: Span | ||
| ): Promise<TenantQueues[]> { | ||
| // Get tenants from dispatch | ||
| const tenants = await this.tenantDispatch.getTenantsFromShard(shardId); | ||
| if (tenants.length === 0) return []; | ||
|
|
||
| // For each tenant, get their queues and build a flat list | ||
| const allQueues: QueueWithScore[] = []; | ||
| for (const { tenantId } of tenants) { | ||
| const queues = await this.tenantDispatch.getQueuesForTenant(tenantId); | ||
| allQueues.push(...queues); | ||
| } | ||
|
|
||
| if (allQueues.length === 0) return []; | ||
|
|
||
| // Use the base scheduler context (without dispatch methods) | ||
| const baseContext = this.#createSchedulerContext(); | ||
|
|
||
| // We need a temporary master queue key for the scheduler. Rather than | ||
| // writing to Redis, we'll group the queues by tenant ourselves and | ||
| // apply the same logic as the legacy selectQueues. | ||
| const masterQueueKey = this.keys.masterQueueKey(shardId); | ||
| return this.scheduler.selectQueues(masterQueueKey, loopId, baseContext); | ||
| } |
There was a problem hiding this comment.
Bug: #fallbackDispatchToLegacyScheduler computes allQueues from dispatch but never uses it.
Lines 985-989 fetch per-tenant queues from the dispatch index into allQueues, but line 1000 calls this.scheduler.selectQueues(masterQueueKey, ...) which reads from the legacy master queue key in Redis — ignoring the fetched data entirely. Since new enqueues no longer write to the master queue, this fallback returns [] for all new messages.
The DRR scheduler implements selectQueuesFromDispatch so this path may not be hit today, but it's a latent correctness bug for any scheduler that doesn't.
🐛 Proposed fix — populate a temporary sorted set or restructure the fallback
async `#fallbackDispatchToLegacyScheduler`(
loopId: string,
shardId: number,
context: DispatchSchedulerContext,
parentSpan?: Span
): Promise<TenantQueues[]> {
// Get tenants from dispatch
const tenants = await this.tenantDispatch.getTenantsFromShard(shardId);
if (tenants.length === 0) return [];
// For each tenant, get their queues and build a flat list
const allQueues: QueueWithScore[] = [];
for (const { tenantId } of tenants) {
const queues = await this.tenantDispatch.getQueuesForTenant(tenantId);
allQueues.push(...queues);
}
if (allQueues.length === 0) return [];
- // Use the base scheduler context (without dispatch methods)
- const baseContext = this.#createSchedulerContext();
-
- // We need a temporary master queue key for the scheduler. Rather than
- // writing to Redis, we'll group the queues by tenant ourselves and
- // apply the same logic as the legacy selectQueues.
- const masterQueueKey = this.keys.masterQueueKey(shardId);
- return this.scheduler.selectQueues(masterQueueKey, loopId, baseContext);
+ // Group fetched queues by tenant and return directly
+ // (bypassing selectQueues since we already have the queue list)
+ const byTenant = new Map<string, string[]>();
+ for (const q of allQueues) {
+ const existing = byTenant.get(q.tenantId) ?? [];
+ existing.push(q.queueId);
+ byTenant.set(q.tenantId, existing);
+ }
+
+ const result: TenantQueues[] = [];
+ for (const [tenantId, queues] of byTenant) {
+ if (this.concurrencyManager) {
+ const atCapacity = await this.concurrencyManager.isAtCapacity("tenant", tenantId);
+ if (atCapacity) continue;
+ }
+ result.push({ tenantId, queues });
+ }
+ return result;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/redis-worker/src/fair-queue/index.ts` around lines 974 - 1001, The
fallback method `#fallbackDispatchToLegacyScheduler` builds allQueues from
tenantDispatch.getQueuesForTenant but then ignores it by calling
scheduler.selectQueues with keys.masterQueueKey (which reads Redis); fix by
using the collected allQueues instead of the Redis master key — either (A) call
a scheduler method that accepts an in-memory queue list (e.g.,
scheduler.selectQueuesFromDispatch or a similar API) passing allQueues, loopId
and baseContext, or (B) populate a temporary sorted set keyed by a generated
tempMasterKey with the entries from allQueues, use that temp key when calling
scheduler.selectQueues, and delete the temp key afterwards; update
`#fallbackDispatchToLegacyScheduler` to perform one of these and ensure
tenantDispatch.getQueuesForTenant and allQueues are actually consumed.
| async ({ redisOptions }) => { | ||
| const keys = new DefaultFairQueueKeyProducer({ prefix: "test" }); | ||
| const processed: Array<{ tenantId: string; value: string }> = []; | ||
| let blockT1 = true; |
There was a problem hiding this comment.
Unused variable blockT1.
blockT1 is declared and set to true on line 498 but is never read anywhere in the test. Looks like leftover from an earlier iteration of the test logic.
🧹 Remove unused variable
const processed: Array<{ tenantId: string; value: string }> = [];
- let blockT1 = true;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let blockT1 = true; | |
| const processed: Array<{ tenantId: string; value: string }> = []; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/redis-worker/src/fair-queue/tests/tenantDispatch.test.ts` at line
498, Remove the unused local variable blockT1 from the test; it is declared as
"let blockT1 = true;" in tenantDispatch.test.ts but never read, so delete that
declaration (or use it if intended for a future assertion) to clean up the test
code and eliminate the unused-variable warning.
Replace flat master queue index with two-level tenant dispatch to fix
noisy neighbor problem. When a tenant has many queues at capacity, the
scheduler now iterates tenants (Level 1) not queues, then fetches
per-tenant queues (Level 2) only for eligible tenants.
Single-deploy migration: new enqueues write to dispatch indexes only,
consumer drains old master queue alongside new dispatch path until empty.