Skip to content

Commit

Permalink
[ML] Ignore unrecognized openai sse fields
Browse files Browse the repository at this point in the history
Azure / Llama sends back fields we do not expect - rewriting the parser
to better handle unknown fields (by dropping them).
  • Loading branch information
prwhelan committed Oct 14, 2024
1 parent 71274df commit 976d70e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public class OpenAiStreamingProcessor extends DelegatingProcessor<Deque<ServerSe
private static final String CHOICES_FIELD = "choices";
private static final String DELTA_FIELD = "delta";
private static final String CONTENT_FIELD = "content";
private static final String FINISH_REASON_FIELD = "finish_reason";
private static final String STOP_MESSAGE = "stop";
private static final String DONE_MESSAGE = "[done]";

@Override
Expand Down Expand Up @@ -162,21 +160,27 @@ private Iterator<StreamingChatCompletionResults.Result> parse(XContentParserConf
ensureExpectedToken(XContentParser.Token.START_OBJECT, currentToken, parser);

currentToken = parser.nextToken();
if (currentToken == XContentParser.Token.END_OBJECT) {
consumeUntilObjectEnd(parser); // end choices
return ""; // stopped
}

if (currentToken == XContentParser.Token.FIELD_NAME && parser.currentName().equals(CONTENT_FIELD)) {
parser.nextToken();
} else {
positionParserAtTokenAfterField(parser, CONTENT_FIELD, FAILED_TO_FIND_FIELD_TEMPLATE);
// continue until the end of delta
while (currentToken != null && currentToken != XContentParser.Token.END_OBJECT) {
if (currentToken == XContentParser.Token.START_OBJECT || currentToken == XContentParser.Token.START_ARRAY) {
parser.skipChildren();
}

if (currentToken == XContentParser.Token.FIELD_NAME && parser.currentName().equals(CONTENT_FIELD)) {
parser.nextToken();
ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.currentToken(), parser);
var content = parser.text();
consumeUntilObjectEnd(parser); // end delta
consumeUntilObjectEnd(parser); // end choices
return content;
}

currentToken = parser.nextToken();
}
ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.currentToken(), parser);
var content = parser.text();
consumeUntilObjectEnd(parser); // end delta

consumeUntilObjectEnd(parser); // end choices
return content;
return ""; // stopped
}).stream()
.filter(Objects::nonNull)
.filter(Predicate.not(String::isEmpty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,42 @@ public void testDoneMessageIsIgnored() throws Exception {
verify(downstream, times(0)).onNext(any());
}

public void testInitialLlamaResponseIsIgnored() throws Exception {
var item = new ArrayDeque<ServerSentEvent>();
item.offer(new ServerSentEvent(ServerSentEventField.DATA, """
{
"id":"12345",
"object":"chat.completion.chunk",
"created":123456789,
"model":"Llama-2-7b-chat",
"system_fingerprint": "123456789",
"choices":[
{
"index":0,
"delta":{
"role":"assistant"
},
"logprobs":null,
"finish_reason":null
}
]
}
"""));

var processor = new OpenAiStreamingProcessor();

Flow.Subscriber<ChunkedToXContent> downstream = mock();
processor.subscribe(downstream);

Flow.Subscription upstream = mock();
processor.onSubscribe(upstream);

processor.next(item);

verify(upstream, times(1)).request(1);
verify(downstream, times(0)).onNext(any());
}

private String toJsonString(ChunkedToXContent chunkedToXContent) throws IOException {
try (var builder = XContentFactory.jsonBuilder()) {
chunkedToXContent.toXContentChunked(EMPTY_PARAMS).forEachRemaining(xContent -> {
Expand Down

0 comments on commit 976d70e

Please sign in to comment.