-
Notifications
You must be signed in to change notification settings - Fork 3.4k
fix(stream) handle task switching #3609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ import { VFS_DIR_TO_RESOURCE } from '@/lib/copilot/resource-types' | |
| import { isWorkflowToolName } from '@/lib/copilot/workflow-tools' | ||
| import { getNextWorkflowColor } from '@/lib/workflows/colors' | ||
| import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry' | ||
| import { deploymentKeys } from '@/hooks/queries/deployments' | ||
| import { | ||
| type TaskChatHistory, | ||
| type TaskStoredContentBlock, | ||
|
|
@@ -21,6 +22,7 @@ import { | |
| taskKeys, | ||
| useChatHistory, | ||
| } from '@/hooks/queries/tasks' | ||
| import { workflowKeys } from '@/hooks/queries/workflows' | ||
| import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order' | ||
| import { useExecutionStream } from '@/hooks/use-execution-stream' | ||
| import { useExecutionStore } from '@/stores/execution/store' | ||
|
|
@@ -74,6 +76,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = { | |
| skipped: 'success', | ||
| } as const | ||
|
|
||
| const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']) | ||
|
|
||
| function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock { | ||
| const mapped: ContentBlock = { | ||
| type: block.type as ContentBlockType, | ||
|
|
@@ -361,6 +365,15 @@ export function useChat( | |
|
|
||
| useEffect(() => { | ||
| if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return | ||
|
|
||
| const activeStreamId = chatHistory.activeStreamId | ||
| const snapshot = chatHistory.streamSnapshot | ||
|
|
||
| if (activeStreamId && !snapshot && !sendingRef.current) { | ||
| queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) }) | ||
| return | ||
| } | ||
|
|
||
| appliedChatIdRef.current = chatHistory.id | ||
| setMessages(chatHistory.messages.map(mapStoredMessage)) | ||
|
|
||
|
|
@@ -374,11 +387,6 @@ export function useChat( | |
| } | ||
| } | ||
|
|
||
| // Kick off stream reconnection immediately if there's an active stream. | ||
| // The stream snapshot was fetched in parallel with the chat history (same | ||
| // API call), so there's no extra round-trip. | ||
| const activeStreamId = chatHistory.activeStreamId | ||
| const snapshot = chatHistory.streamSnapshot | ||
| if (activeStreamId && !sendingRef.current) { | ||
| const gen = ++streamGenRef.current | ||
| const abortController = new AbortController() | ||
|
|
@@ -396,8 +404,7 @@ export function useChat( | |
| const batchEvents = snapshot?.events ?? [] | ||
| const streamStatus = snapshot?.status ?? '' | ||
|
|
||
| if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) { | ||
| // No snapshot available — stream buffer expired. Clean up. | ||
| if (batchEvents.length === 0 && streamStatus === 'unknown') { | ||
| const cid = chatIdRef.current | ||
| if (cid) { | ||
| fetch('/api/mothership/chat/stop', { | ||
|
|
@@ -462,7 +469,7 @@ export function useChat( | |
| } | ||
| reconnect() | ||
| } | ||
| }, [chatHistory, workspaceId]) | ||
| }, [chatHistory, workspaceId, queryClient]) | ||
|
|
||
| useEffect(() => { | ||
| if (resources.length === 0) { | ||
|
|
@@ -686,6 +693,33 @@ export function useChat( | |
| onResourceEventRef.current?.() | ||
| } | ||
| } | ||
|
|
||
| if (DEPLOY_TOOL_NAMES.has(tc.name) && tc.status === 'success') { | ||
| const output = tc.result?.output as Record<string, unknown> | undefined | ||
| const deployedWorkflowId = (output?.workflowId as string) ?? undefined | ||
| if (deployedWorkflowId && typeof output?.isDeployed === 'boolean') { | ||
| const isDeployed = output.isDeployed as boolean | ||
| const serverDeployedAt = output.deployedAt | ||
| ? new Date(output.deployedAt as string) | ||
| : undefined | ||
| useWorkflowRegistry | ||
| .getState() | ||
| .setDeploymentStatus( | ||
| deployedWorkflowId, | ||
| isDeployed, | ||
| isDeployed ? (serverDeployedAt ?? new Date()) : undefined | ||
| ) | ||
| queryClient.invalidateQueries({ | ||
| queryKey: deploymentKeys.info(deployedWorkflowId), | ||
| }) | ||
| queryClient.invalidateQueries({ | ||
| queryKey: deploymentKeys.versions(deployedWorkflowId), | ||
| }) | ||
| queryClient.invalidateQueries({ | ||
| queryKey: workflowKeys.list(workspaceId), | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| break | ||
|
|
@@ -1116,11 +1150,6 @@ export function useChat( | |
| useEffect(() => { | ||
| return () => { | ||
| streamGenRef.current++ | ||
| // Only drop the browser→Sim read; the Sim→Go stream stays open | ||
| // so the backend can finish persisting. Explicit abort is only | ||
| // triggered by the stop button via /api/copilot/chat/abort. | ||
| abortControllerRef.current?.abort() | ||
| abortControllerRef.current = null | ||
| sendingRef.current = false | ||
| } | ||
| }, []) | ||
|
Comment on lines
1150
to
1155
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed abort means stream continues processing after unmount The previous cleanup explicitly called
Consider aborting the reader without aborting the server-side stream — i.e., abort the client |
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -87,7 +87,10 @@ export async function executeDeployChat( | |||||||||||||||||||||||||||
| return { success: false, error: 'Unauthorized chat access' } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| await db.delete(chat).where(eq(chat.id, existing[0].id)) | ||||||||||||||||||||||||||||
| return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } } | ||||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||||
| success: true, | ||||||||||||||||||||||||||||
| output: { workflowId, success: true, action: 'undeploy', isChatDeployed: false }, | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
88
to
+93
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The deploy path returns This is arguably correct behaviour (undeploying a chat does not remove the workflow's API deployment), but the asymmetry between the deploy and undeploy output shapes makes intent harder to follow. A comment explaining why
Suggested change
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId) | ||||||||||||||||||||||||||||
|
|
@@ -199,9 +202,11 @@ export async function executeDeployChat( | |||||||||||||||||||||||||||
| return { | ||||||||||||||||||||||||||||
| success: true, | ||||||||||||||||||||||||||||
| output: { | ||||||||||||||||||||||||||||
| workflowId, | ||||||||||||||||||||||||||||
| success: true, | ||||||||||||||||||||||||||||
| action: 'deploy', | ||||||||||||||||||||||||||||
| isDeployed: true, | ||||||||||||||||||||||||||||
| isChatDeployed: true, | ||||||||||||||||||||||||||||
| identifier, | ||||||||||||||||||||||||||||
| chatUrl: `${baseUrl}/chat/${identifier}`, | ||||||||||||||||||||||||||||
| apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`, | ||||||||||||||||||||||||||||
|
|
@@ -355,6 +360,7 @@ export async function executeRedeploy( | |||||||||||||||||||||||||||
| success: true, | ||||||||||||||||||||||||||||
| output: { | ||||||||||||||||||||||||||||
| workflowId, | ||||||||||||||||||||||||||||
| isDeployed: true, | ||||||||||||||||||||||||||||
| deployedAt: result.deployedAt || null, | ||||||||||||||||||||||||||||
| version: result.version, | ||||||||||||||||||||||||||||
| apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`, | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded polling loop when no snapshot is available
When switching to a task that has an active stream but no snapshot, this early return path intentionally skips setting
appliedChatIdRef.current = chatHistory.id. That means every time the invalidated query resolves andchatHistoryupdates, thisuseEffectfires again, evaluates the same condition, and callsinvalidateQueriesonce more — creating a tight polling loop for the lifetime of the stream.useChatHistoryhas astaleTimeof 30 seconds, butinvalidateQueriesbypassesstaleTimeand triggers an immediate refetch. So the loop rate is limited only by network round-trip time, which could produce dozens of requests per minute if the server takes a while to produce a snapshot.If the intent is to poll until a snapshot appears, this needs an explicit back-off or maximum retry count. One approach is to return a cleanup function from the effect that uses a delayed
invalidateQueriescall so subsequent polls are spaced out, rather than firing immediately on every data update.