Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { VFS_DIR_TO_RESOURCE } from '@/lib/copilot/resource-types'
import { isWorkflowToolName } from '@/lib/copilot/workflow-tools'
import { getNextWorkflowColor } from '@/lib/workflows/colors'
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
import { deploymentKeys } from '@/hooks/queries/deployments'
import {
type TaskChatHistory,
type TaskStoredContentBlock,
Expand All @@ -21,6 +22,7 @@ import {
taskKeys,
useChatHistory,
} from '@/hooks/queries/tasks'
import { workflowKeys } from '@/hooks/queries/workflows'
import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { useExecutionStore } from '@/stores/execution/store'
Expand Down Expand Up @@ -74,6 +76,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
skipped: 'success',
} as const

const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])

function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
const mapped: ContentBlock = {
type: block.type as ContentBlockType,
Expand Down Expand Up @@ -361,6 +365,15 @@ export function useChat(

useEffect(() => {
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return

const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot

if (activeStreamId && !snapshot && !sendingRef.current) {
queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) })
return
}
Comment on lines +372 to +375
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded polling loop when no snapshot is available

When switching to a task that has an active stream but no snapshot, this early return path intentionally skips setting appliedChatIdRef.current = chatHistory.id. That means every time the invalidated query resolves and chatHistory updates, this useEffect fires again, evaluates the same condition, and calls invalidateQueries once more — creating a tight polling loop for the lifetime of the stream.

useChatHistory has a staleTime of 30 seconds, but invalidateQueries bypasses staleTime and triggers an immediate refetch. So the loop rate is limited only by network round-trip time, which could produce dozens of requests per minute if the server takes a while to produce a snapshot.

If the intent is to poll until a snapshot appears, this needs an explicit back-off or maximum retry count. One approach is to return a cleanup function from the effect that uses a delayed invalidateQueries call so subsequent polls are spaced out, rather than firing immediately on every data update.


appliedChatIdRef.current = chatHistory.id
setMessages(chatHistory.messages.map(mapStoredMessage))

Expand All @@ -374,11 +387,6 @@ export function useChat(
}
}

// Kick off stream reconnection immediately if there's an active stream.
// The stream snapshot was fetched in parallel with the chat history (same
// API call), so there's no extra round-trip.
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
Expand All @@ -396,8 +404,7 @@ export function useChat(
const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''

if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
// No snapshot available — stream buffer expired. Clean up.
if (batchEvents.length === 0 && streamStatus === 'unknown') {
const cid = chatIdRef.current
if (cid) {
fetch('/api/mothership/chat/stop', {
Expand Down Expand Up @@ -462,7 +469,7 @@ export function useChat(
}
reconnect()
}
}, [chatHistory, workspaceId])
}, [chatHistory, workspaceId, queryClient])

useEffect(() => {
if (resources.length === 0) {
Expand Down Expand Up @@ -686,6 +693,33 @@ export function useChat(
onResourceEventRef.current?.()
}
}

if (DEPLOY_TOOL_NAMES.has(tc.name) && tc.status === 'success') {
const output = tc.result?.output as Record<string, unknown> | undefined
const deployedWorkflowId = (output?.workflowId as string) ?? undefined
if (deployedWorkflowId && typeof output?.isDeployed === 'boolean') {
const isDeployed = output.isDeployed as boolean
const serverDeployedAt = output.deployedAt
? new Date(output.deployedAt as string)
: undefined
useWorkflowRegistry
.getState()
.setDeploymentStatus(
deployedWorkflowId,
isDeployed,
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
)
queryClient.invalidateQueries({
queryKey: deploymentKeys.info(deployedWorkflowId),
})
queryClient.invalidateQueries({
queryKey: deploymentKeys.versions(deployedWorkflowId),
})
queryClient.invalidateQueries({
queryKey: workflowKeys.list(workspaceId),
})
}
}
}

break
Expand Down Expand Up @@ -1116,11 +1150,6 @@ export function useChat(
useEffect(() => {
return () => {
streamGenRef.current++
// Only drop the browser→Sim read; the Sim→Go stream stays open
// so the backend can finish persisting. Explicit abort is only
// triggered by the stop button via /api/copilot/chat/abort.
abortControllerRef.current?.abort()
abortControllerRef.current = null
sendingRef.current = false
}
}, [])
Comment on lines 1150 to 1155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed abort means stream continues processing after unmount

The previous cleanup explicitly called abortControllerRef.current?.abort() to cancel the in-flight SSE fetch when the component unmounts. That line was intentionally removed here to allow the backend stream to stay alive across task switches, but it has a side effect: the ReadableStreamDefaultReader inside processSSEStream's while (true) { reader.read() } loop keeps running after unmount.

streamGenRef.current++ prevents finalizeRef.current() from being called with a stale generation, but the inner flush() calls — which call setMessages, setIsSending, setIsReconnecting etc. — are not gated by the generation counter. If the stream is still in flight when the user switches tasks, state setters are called on the now-stale React fiber, which can cause React warnings and ghost state.

Consider aborting the reader without aborting the server-side stream — i.e., abort the client fetch connection while keeping the Go-side stream alive. The server-side stream can remain open without the client holding a reader:

Expand Down
40 changes: 24 additions & 16 deletions apps/sim/lib/copilot/client-sse/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}

// Deploy tools: update deployment status in workflow registry
if (
targetState === ClientToolCallState.success &&
(current.name === 'deploy_api' ||
Expand All @@ -579,21 +578,30 @@ export const sseHandlers: Record<string, SSEHandler> = {
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const input = asRecord(current.params)
const workflowId =
(resultPayload?.workflowId as string) ||
(input?.workflowId as string) ||
useWorkflowRegistry.getState().activeWorkflowId
const isDeployed = resultPayload?.isDeployed !== false
if (workflowId) {
useWorkflowRegistry
.getState()
.setDeploymentStatus(workflowId, isDeployed, isDeployed ? new Date() : undefined)
logger.info('[SSE] Updated deployment status from tool result', {
toolName: current.name,
workflowId,
isDeployed,
})
if (typeof resultPayload?.isDeployed === 'boolean') {
const input = asRecord(current.params)
const workflowId =
(resultPayload?.workflowId as string) ||
(input?.workflowId as string) ||
useWorkflowRegistry.getState().activeWorkflowId
const isDeployed = resultPayload.isDeployed as boolean
const serverDeployedAt = resultPayload.deployedAt
? new Date(resultPayload.deployedAt as string)
: undefined
if (workflowId) {
useWorkflowRegistry
.getState()
.setDeploymentStatus(
workflowId,
isDeployed,
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
)
logger.info('[SSE] Updated deployment status from tool result', {
toolName: current.name,
workflowId,
isDeployed,
})
}
}
} catch (err) {
logger.warn('[SSE] Failed to hydrate deployment status', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ export async function executeDeployChat(
return { success: false, error: 'Unauthorized chat access' }
}
await db.delete(chat).where(eq(chat.id, existing[0].id))
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
return {
success: true,
output: { workflowId, success: true, action: 'undeploy', isChatDeployed: false },
}
Comment on lines 88 to +93
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deploy_chat undeploy response is missing isDeployed for symmetry

The deploy path returns isDeployed: true, but the undeploy path now returns isChatDeployed: false with no isDeployed field. The client-side handlers in both handlers.ts and use-chat.ts gate their registry update on typeof output?.isDeployed === 'boolean', so they silently skip the undeploy case.

This is arguably correct behaviour (undeploying a chat does not remove the workflow's API deployment), but the asymmetry between the deploy and undeploy output shapes makes intent harder to follow. A comment explaining why isDeployed is intentionally absent from the undeploy response would help future maintainers:

Suggested change
}
await db.delete(chat).where(eq(chat.id, existing[0].id))
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
return {
success: true,
output: { workflowId, success: true, action: 'undeploy', isChatDeployed: false },
}
return {
success: true,
// isDeployed is intentionally omitted: undeploying a chat does not
// affect the workflow's API deployment status.
output: { workflowId, success: true, action: 'undeploy', isChatDeployed: false },
}

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

}

const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
Expand Down Expand Up @@ -199,9 +202,11 @@ export async function executeDeployChat(
return {
success: true,
output: {
workflowId,
success: true,
action: 'deploy',
isDeployed: true,
isChatDeployed: true,
identifier,
chatUrl: `${baseUrl}/chat/${identifier}`,
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,
Expand Down Expand Up @@ -355,6 +360,7 @@ export async function executeRedeploy(
success: true,
output: {
workflowId,
isDeployed: true,
deployedAt: result.deployedAt || null,
version: result.version,
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,
Expand Down