-
Notifications
You must be signed in to change notification settings - Fork 3
fix: handle chatcompletions streaming tool calls with no text preamble and non-zero indices #147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: handle chatcompletions streaming tool calls with no text preamble and non-zero indices #147
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
12e3143 to
b4479ce
Compare
d02fd35 to
9e54caf
Compare
dannykopping
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might've spotted a bug, PTAL
| } else { | ||
| // Injected tools mark the stream as initiated so we continue to tool invocation. | ||
| // When the provider responds with a tool call as the first chunk (no text | ||
| // preamble), no chunks are relayed to the client. Marking as initiated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // preamble), no chunks are relayed to the client. Marking as initiated | |
| // preamble), no chunks are relayed to the client, which marks the stream as initiated. This | |
| // Some providers may return tool calls with non-zero starting indices, | ||
| // resulting in nil entries in the array that must be removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Move this comment above compactToolCalls to be more clear.
intercept/eventstream/eventstream.go
Outdated
| // MarkInitiated marks the stream as initiated, even if no events have been | ||
| // sent to the client yet. A stream is considered initiated when processing | ||
| // injected tool calls that don't relay chunks to the client. | ||
| func (s *EventStream) MarkInitiated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❗I was going to suggest that the code in the control loop should reuse this method, and I think it should, but it also made me realise there might be a bug here.
When the stream is marked initiated it also sends SSE headers etc, which we won't do here.
I'm kinda surprised this works. Maybe it's working by accident? Something feels missing to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would just setting s.initiated.Store(true) without using s.initiateOnce.Do be enough and solve the header issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of using s.initiated for marking upstream connection as established. My understanding is that it should mark downstream connection as established.
For now I guess it should be fine but it looks to me that more work should be done here to make things clearer or maybe my interpretation is wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of using s.initiated for marking upstream connection as established.
That's not what we're using this for. We're using this to mark when the first relayable chunk has been received (so we can establish the downstream stream); i.e. the response didn't end prematurely with an HTTP error from the upstream but instead we've received stream events from it.
We could be more specific here by actually validating that we've received SSE headers from the upstream, but that'd require a bit of a refactor and I don't think the lib exposes this to us IIRC. We can add this as a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested again, and I can confirm that with this change, we are not sending the headers. However, the client is still accepting the response (likely because it processes the body based on content format, not strictly on the Content-Type header 🤷♀️ ).
Would just setting s.initiated.Store(true) without using s.initiateOnce.Do be enough and solve the header issue?
This would fix it, because then we guarantee that s.initiateOnce.Do(func() from Start() would be invoked and the headers set.
However, I don't think this is the best approach. The problem we're trying to solve is that we received a tool call invocation, and we don't want to relay any of this to the client, so the stream has not yet been initiated. Marking s.initiated = true when we haven't actually initiated the stream doesn't seem right semantically.
What we actually want is to continue with the tool call invocation. I'd suggest going with the initial fix in ProcessRequest():
if events.IsStreaming() {
// ... existing error handling and relay logic ...
} else if toolCall == nil {
// Stream has not started and no tool call - write error response and exit.
i.writeUpstreamError(w, getErrorResponse(stream.Err()))
return stream.Err()
}
// If we have a tool call, continue to tool invocation even though stream hasn't started yet.
// Headers will be set when the subsequent response actually sends content to the client.
This keeps the semantics clear: IsStreaming() reflects whether we've actually started streaming to the client, and we only bail out early if there's no tool call to process.
@dannykopping @pawbana Wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends how stream initiation should be understood. I'm open to both approaches but I can see how reflecting the status of upstream stream in downstream stream would be intuitive, something like we've got response from upstream -> we can initialize downstream.
How about sending empty event (empty byte array payload) when marking stream as initiated? It will both set the
s.initiated.Store(true)and send headers?
I like this idea, I tried implementing it but there is a risk of race condition:
Send()puts event on channelStart()goroutine picks it up (channel now empty)Start()is about to sets.initiated.Store(true)but hasn't yetIsStreaming()checks - channel is empty,initiatedis still false- Returns false → early exit
This causes the tool to not be invoked. We could add a check:
if events.IsStreaming() {
// Race condition does not enter here...
} else if toolCall == nil {
i.writeUpstreamError(w, getErrorResponse(stream.Err()))
return stream.Err()
}
But then we skip stream error handling for this case which is not ideal 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What we could do is set s.initiated.Store(true) (make sure IsStreaming() returns true and the stream error handling is taken care of) and send the event (make sure the headers are sent) 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch with race condition. I guess we need 2 separate booleans then.
1st -> headers where sent
2nd -> stream was initialized == upstream responded with proper streaming response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with a different approach, by separating the headers sent and the events sent into 2 different operations. MarkInitiated() now marks the stream as initiated, which means it sends all the headers and starts the ping ticker. This uses initiateOnce which guarantees that we only perform these operations once, in one of the following scenarios:
- Previous assumption: The response from the AI provider contains text content before any tool call. Headers are sent when the first text chunk is processed.
- New assumption: The response from the AI provider contains only tool calls (no text content). After processing the tool calls, if there is an injected tool call, we call
events.MarkInitiated()to send the headers. Events will only be sent when the AI provider responds with text content after tool invocation (second stream).
This was addressed in commit 61ea53a
However, I'm not entirely sure of this approach, because of this comment:
aibridge/intercept/eventstream/eventstream.go
Lines 91 to 97 in bf1abce
| // Send headers for Server-Sent Event stream. | |
| // | |
| // We only send these once an event is processed because an error can occur in the upstream | |
| // request prior to the stream starting, in which case the SSE headers are inappropriate to | |
| // send to the client. | |
| // | |
| // See use of IsStreaming(). |
IIUC, these errors are HTTP errors (auth, rate limit, etc.), right? Now we have a check to only call MarkInitiated() if stream.Err() == nil, meaning we check if there are any errors with the first stream. If there are, IsStreaming() will return false and we send the error to the client as an HTTP response. Errors that appear in subsequent streams will be sent via SSE events. This is the correct flow, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, these errors are HTTP errors (auth, rate limit, etc.), right? Now we have a check to only call
MarkInitiated()ifstream.Err() == nil, meaning we check if there are any errors with the first stream. If there are,IsStreaming()will return false and we send the error to the client as an HTTP response. Errors that appear in subsequent streams will be sent via SSE events. This is the correct flow, right?
This implementation looks correct, but the flow is a bit haphazard. It's more a critique of how the code was before than of your addition; the code could use some refactoring.
One small change we could make is to separate events.IsStreaming() into two methods: one which returns initiated value, the other which returns the number of pending events. That'd tie the logic together a bit more tightly. For example, if there's no stream error we mark the stream as initiated, and if there's an error AND the stream is not initiated AND the stream has no pending events then return an HTTP error instead of an error event.
Non-blocking suggestion, though. Feel free to run with this as-is 👍
|
|
||
| data: {"choices":[{"finish_reason":"stop","index":0,"delta":{"content":null}}],"created":1769198062,"id":"msg_vrtx_015B1npskreQgEjMrfsdjH1m","usage":{"completion_tokens":85,"prompt_tokens":25989,"prompt_tokens_details":{"cached_tokens":0},"total_tokens":26074},"model":"claude-haiku-4.5"} | ||
|
|
||
| data: [DONE] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming fixtures may need 2 new lines at the end for the stream to be properly parsed by SDK. See: https://github.com/coder/aibridge/pull/123/changes#r2698610345
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch; we should add a linter for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had no problems with the tests, but added the newline: 734ddda
intercept/eventstream/eventstream.go
Outdated
| // MarkInitiated marks the stream as initiated, even if no events have been | ||
| // sent to the client yet. A stream is considered initiated when processing | ||
| // injected tool calls that don't relay chunks to the client. | ||
| func (s *EventStream) MarkInitiated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would just setting s.initiated.Store(true) without using s.initiateOnce.Do be enough and solve the header issue?
intercept/eventstream/eventstream.go
Outdated
| // MarkInitiated marks the stream as initiated, even if no events have been | ||
| // sent to the client yet. A stream is considered initiated when processing | ||
| // injected tool calls that don't relay chunks to the client. | ||
| func (s *EventStream) MarkInitiated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of using s.initiated for marking upstream connection as established. My understanding is that it should mark downstream connection as established.
For now I guess it should be fine but it looks to me that more work should be done here to make things clearer or maybe my interpretation is wrong.
| actual, err := json.Marshal(invocations[0]) | ||
| require.NoError(t, err) | ||
| require.EqualValues(t, expected, actual) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing else check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not missing, for the non-zero index fixture expectedArgs is nil, because it doesn't include arguments (the owner parameter is optional and Coder defaults to "me", additionally providers don't always include optional arguments). The test only validates arguments for the "tool call no preamble" fixture that includes an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ment to ask if something not being set should be checked in else case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in this if case, we only check for the tool call arguments, if the fixture only has a tool call without arguments, we don't check anything.
efa7ba4 to
734ddda
Compare
dannykopping
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
intercept/eventstream/eventstream.go
Outdated
| // MarkInitiated marks the stream as initiated, even if no events have been | ||
| // sent to the client yet. A stream is considered initiated when processing | ||
| // injected tool calls that don't relay chunks to the client. | ||
| func (s *EventStream) MarkInitiated() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, these errors are HTTP errors (auth, rate limit, etc.), right? Now we have a check to only call
MarkInitiated()ifstream.Err() == nil, meaning we check if there are any errors with the first stream. If there are,IsStreaming()will return false and we send the error to the client as an HTTP response. Errors that appear in subsequent streams will be sent via SSE events. This is the correct flow, right?
This implementation looks correct, but the flow is a bit haphazard. It's more a critique of how the code was before than of your addition; the code could use some refactoring.
One small change we could make is to separate events.IsStreaming() into two methods: one which returns initiated value, the other which returns the number of pending events. That'd tie the logic together a bit more tightly. For example, if there's no stream error we mark the stream as initiated, and if there's an error AND the stream is not initiated AND the stream has no pending events then return an HTTP error instead of an error event.
Non-blocking suggestion, though. Feel free to run with this as-is 👍
Merge activity
|
…e and non-zero indices
…client timeout during tool invocation
8e62b06 to
83c79e4
Compare

Description
Fixes two edge cases in streaming chat completions when handling injected tool calls observed with Copilot Provider:
events.IsStreaming()returns false, causing the request to end prematurely without invoking the tool.index: 1instead ofindex: 0, the OpenAI SDK accumulator creates[nil, {tool}]. When this is appended to the messages for the next request, the provider rejects it.Changes
MarkInitiated()toEventStreamto mark the stream as active when processing injected tool calls that don't relay chunks to the clientcompactToolCalls()to remove nil/empty entries from the tool calls array before appending completions to messages