feat(concurrency): bullmq based concurrency control system#3605
feat(concurrency): bullmq based concurrency control system#3605icecrasher321 wants to merge 11 commits intostagingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
PR SummaryHigh Risk Overview Execution entrypoints are updated to enqueue via Adds an in-process Written by Cursor Bugbot for commit 2bf1feb. Configure here. |
Greptile SummaryThis PR introduces a comprehensive BullMQ-based queuing and concurrency control system for workflow, webhook, and schedule executions. It replaces the previous Redis/database job queue backends with BullMQ queues backed by Redis, adds a per-workspace fairness dispatcher (with a Lua-script-based atomic claim mechanism), a lease-based concurrency limit (keyed to billing plan), an in-process admission gate for external API requests, and a standalone worker process to consume jobs. It also adds a buffered SSE stream for non-manual executions that now run asynchronously through the dispatch queue. The change is substantial (~5900 lines added) and represents a foundational infrastructure improvement for reliability and rate-limiting protection. Key changes and issues found:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant RouteHandler as API Route Handler
participant AdmissionGate as Admission Gate (in-process)
participant Dispatcher as Workspace Dispatcher
participant RedisStore as Redis Dispatch Store
participant BullMQ as BullMQ Queue (Redis)
participant Worker as BullMQ Worker Process
participant DispatchWorker as Dispatch Worker (worker.ts)
Client->>RouteHandler: POST /api/workflows/[id]/execute
RouteHandler->>AdmissionGate: tryAdmit()
alt At capacity (inflight >= MAX_INFLIGHT)
AdmissionGate-->>RouteHandler: null
RouteHandler-->>Client: 429 Too Many Requests
else Admitted
AdmissionGate-->>RouteHandler: ticket
RouteHandler->>Dispatcher: enqueueWorkspaceDispatch(input)
Dispatcher->>RedisStore: enqueueWorkspaceDispatchJob()
RedisStore-->>Dispatcher: jobRecord (status=waiting)
Dispatcher->>Dispatcher: runDispatcherLoop() [void]
Dispatcher->>RedisStore: popNextWorkspaceId()
Dispatcher->>RedisStore: claimWorkspaceJob() [Lua script]
RedisStore-->>Dispatcher: {type: admitted, record, leaseId}
Dispatcher->>BullMQ: queue.add(jobName, payload, {jobId})
Dispatcher->>RedisStore: markDispatchJobAdmitted()
Dispatcher-->>RouteHandler: dispatchJobId
RouteHandler->>RouteHandler: waitForDispatchJob(id, timeout) [polls 250ms]
Worker->>BullMQ: picks up job
Worker->>DispatchWorker: runDispatchedJob(metadata, fn)
DispatchWorker->>RedisStore: markDispatchJobRunning()
DispatchWorker->>DispatchWorker: executeQueuedWorkflowJob() / executeWorkflowJob()
DispatchWorker->>RedisStore: markDispatchJobCompleted(output)
DispatchWorker->>RedisStore: releaseWorkspaceLease()
DispatchWorker->>Dispatcher: wakeWorkspaceDispatcher()
RedisStore-->>RouteHandler: record (status=completed) [via poll]
RouteHandler->>AdmissionGate: ticket.release()
RouteHandler-->>Client: 200 JSON result
end
Last reviewed commit: be83c97 |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
|
bugbot run |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| pipeline.set(jobKey(record.id), JSON.stringify(record), 'EX', JOB_TTL_SECONDS) | ||
| pipeline.zadd(workspaceLaneKey(record.workspaceId, record.lane), score, record.id) | ||
| pipeline.zadd(ACTIVE_WORKSPACES_KEY, 'NX', sequence, record.workspaceId) | ||
| await pipeline.exec() |
There was a problem hiding this comment.
Global depth counter drifts when jobs are restored
Low Severity
enqueueWorkspaceDispatchJob increments GLOBAL_DEPTH_KEY while markDispatchJobCompleted and markDispatchJobFailed decrement it. However, restoreWorkspaceDispatchJob (used by the reconciler to reset stranded jobs back to waiting) does not increment the counter, even though a restored job is active again and re-enters the queue. Since the completed/failed path already decremented the counter (or the counter was never decremented for a stranded job), each restore can cause the counter to under-count. This gradually drifts the global depth below reality until reconcileGlobalQueueDepth corrects it.
Additional Locations (1)
There was a problem hiding this comment.
the reconciler never restores terminal jobs -- so this can't happen


Summary
BullMQ based concurrency control system for executions currently running in line [manual execs excluded]. Can tune limits based on resources.
Overall admin gates to prevent rate limiting services based crashes.
Type of Change
Testing
Tested manually under different configurations, added extensive test suite
Checklist