Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions apps/sim/app/api/jobs/[jobId]/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* @vitest-environment node
*/
import type { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const {
mockCheckHybridAuth,
mockGetDispatchJobRecord,
mockGetJobQueue,
mockVerifyWorkflowAccess,
mockGetWorkflowById,
} = vi.hoisted(() => ({
mockCheckHybridAuth: vi.fn(),
mockGetDispatchJobRecord: vi.fn(),
mockGetJobQueue: vi.fn(),
mockVerifyWorkflowAccess: vi.fn(),
mockGetWorkflowById: vi.fn(),
}))

vi.mock('@sim/logger', () => ({
createLogger: () => ({
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
}),
}))

vi.mock('@/lib/auth/hybrid', () => ({
checkHybridAuth: mockCheckHybridAuth,
}))

vi.mock('@/lib/core/async-jobs', () => ({
JOB_STATUS: {
PENDING: 'pending',
PROCESSING: 'processing',
COMPLETED: 'completed',
FAILED: 'failed',
},
getJobQueue: mockGetJobQueue,
}))

vi.mock('@/lib/core/workspace-dispatch/store', () => ({
getDispatchJobRecord: mockGetDispatchJobRecord,
}))

vi.mock('@/lib/core/utils/request', () => ({
generateRequestId: vi.fn().mockReturnValue('request-1'),
}))

vi.mock('@/socket/middleware/permissions', () => ({
verifyWorkflowAccess: mockVerifyWorkflowAccess,
}))

vi.mock('@/lib/workflows/utils', () => ({
getWorkflowById: mockGetWorkflowById,
}))

import { GET } from './route'

function createMockRequest(): NextRequest {
return {
headers: {
get: () => null,
},
} as NextRequest
}

describe('GET /api/jobs/[jobId]', () => {
beforeEach(() => {
vi.clearAllMocks()

mockCheckHybridAuth.mockResolvedValue({
success: true,
userId: 'user-1',
apiKeyType: undefined,
workspaceId: undefined,
})

mockVerifyWorkflowAccess.mockResolvedValue({ hasAccess: true })
mockGetWorkflowById.mockResolvedValue({
id: 'workflow-1',
workspaceId: 'workspace-1',
})

mockGetJobQueue.mockResolvedValue({
getJob: vi.fn().mockResolvedValue(null),
})
})

it('returns dispatcher-aware waiting status with metadata', async () => {
mockGetDispatchJobRecord.mockResolvedValue({
id: 'dispatch-1',
workspaceId: 'workspace-1',
lane: 'runtime',
queueName: 'workflow-execution',
bullmqJobName: 'workflow-execution',
bullmqPayload: {},
metadata: {
workflowId: 'workflow-1',
},
priority: 10,
status: 'waiting',
createdAt: 1000,
admittedAt: 2000,
})

const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'dispatch-1' }),
})
const body = await response.json()

expect(response.status).toBe(200)
expect(body.status).toBe('waiting')
expect(body.metadata.queueName).toBe('workflow-execution')
expect(body.metadata.lane).toBe('runtime')
expect(body.metadata.workspaceId).toBe('workspace-1')
})

it('returns completed output from dispatch state', async () => {
mockGetDispatchJobRecord.mockResolvedValue({
id: 'dispatch-2',
workspaceId: 'workspace-1',
lane: 'interactive',
queueName: 'workflow-execution',
bullmqJobName: 'direct-workflow-execution',
bullmqPayload: {},
metadata: {
workflowId: 'workflow-1',
},
priority: 1,
status: 'completed',
createdAt: 1000,
startedAt: 2000,
completedAt: 7000,
output: { success: true },
})

const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'dispatch-2' }),
})
const body = await response.json()

expect(response.status).toBe(200)
expect(body.status).toBe('completed')
expect(body.output).toEqual({ success: true })
expect(body.metadata.duration).toBe(5000)
})

it('returns 404 when neither dispatch nor BullMQ job exists', async () => {
mockGetDispatchJobRecord.mockResolvedValue(null)

const response = await GET(createMockRequest(), {
params: Promise.resolve({ jobId: 'missing-job' }),
})

expect(response.status).toBe(404)
})
})
56 changes: 22 additions & 34 deletions apps/sim/app/api/jobs/[jobId]/route.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { getJobQueue, JOB_STATUS } from '@/lib/core/async-jobs'
import { getJobQueue } from '@/lib/core/async-jobs'
import { generateRequestId } from '@/lib/core/utils/request'
import { presentDispatchOrJobStatus } from '@/lib/core/workspace-dispatch/status'
import { getDispatchJobRecord } from '@/lib/core/workspace-dispatch/store'
import { createErrorResponse } from '@/app/api/workflows/utils'

const logger = createLogger('TaskStatusAPI')
Expand All @@ -23,68 +25,54 @@ export async function GET(

const authenticatedUserId = authResult.userId

const dispatchJob = await getDispatchJobRecord(taskId)
const jobQueue = await getJobQueue()
const job = await jobQueue.getJob(taskId)
const job = dispatchJob ? null : await jobQueue.getJob(taskId)

if (!job) {
if (!job && !dispatchJob) {
return createErrorResponse('Task not found', 404)
}

if (job.metadata?.workflowId) {
const metadataToCheck = dispatchJob?.metadata ?? job?.metadata

if (metadataToCheck?.workflowId) {
const { verifyWorkflowAccess } = await import('@/socket/middleware/permissions')
const accessCheck = await verifyWorkflowAccess(
authenticatedUserId,
job.metadata.workflowId as string
metadataToCheck.workflowId as string
)
if (!accessCheck.hasAccess) {
logger.warn(`[${requestId}] Access denied to workflow ${job.metadata.workflowId}`)
logger.warn(`[${requestId}] Access denied to workflow ${metadataToCheck.workflowId}`)
return createErrorResponse('Access denied', 403)
}

if (authResult.apiKeyType === 'workspace' && authResult.workspaceId) {
const { getWorkflowById } = await import('@/lib/workflows/utils')
const workflow = await getWorkflowById(job.metadata.workflowId as string)
const workflow = await getWorkflowById(metadataToCheck.workflowId as string)
if (!workflow?.workspaceId || workflow.workspaceId !== authResult.workspaceId) {
return createErrorResponse('API key is not authorized for this workspace', 403)
}
}
} else if (job.metadata?.userId && job.metadata.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${job.metadata.userId}`)
} else if (metadataToCheck?.userId && metadataToCheck.userId !== authenticatedUserId) {
logger.warn(`[${requestId}] Access denied to user ${metadataToCheck.userId}`)
return createErrorResponse('Access denied', 403)
} else if (!job.metadata?.userId && !job.metadata?.workflowId) {
} else if (!metadataToCheck?.userId && !metadataToCheck?.workflowId) {
logger.warn(`[${requestId}] Access denied to job ${taskId}`)
return createErrorResponse('Access denied', 403)
}

const mappedStatus = job.status === JOB_STATUS.PENDING ? 'queued' : job.status

const presented = presentDispatchOrJobStatus(dispatchJob, job)
const response: any = {
success: true,
taskId,
status: mappedStatus,
metadata: {
startedAt: job.startedAt,
},
}

if (job.status === JOB_STATUS.COMPLETED) {
response.output = job.output
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
}

if (job.status === JOB_STATUS.FAILED) {
response.error = job.error
response.metadata.completedAt = job.completedAt
if (job.startedAt && job.completedAt) {
response.metadata.duration = job.completedAt.getTime() - job.startedAt.getTime()
}
status: presented.status,
metadata: presented.metadata,
}

if (job.status === JOB_STATUS.PROCESSING || job.status === JOB_STATUS.PENDING) {
response.estimatedDuration = 300000
if (presented.output !== undefined) response.output = presented.output
if (presented.error !== undefined) response.error = presented.error
if (presented.estimatedDuration !== undefined) {
response.estimatedDuration = presented.estimatedDuration
}

return NextResponse.json(response)
Expand Down
Loading
Loading