Skip to content

feat: Support LangChain orchestration client streaming #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 61 commits into
base: main
Choose a base branch
from

Conversation

KavithaSiva
Copy link
Contributor

@KavithaSiva KavithaSiva commented May 7, 2025

Context

Closes SAP/ai-sdk-js-backlog#259.

What this PR does and why it is needed

This PR introduces streaming support for Langchain Orchestration client.

@KavithaSiva KavithaSiva marked this pull request as ready for review May 12, 2025 07:57
@KavithaSiva
Copy link
Contributor Author

KavithaSiva commented May 12, 2025

Adding more tests in a follow-up PR.

Copy link
Contributor

@ZhongpinWang ZhongpinWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions for improvement. Also please do implement tests already in this PR to avoid changes later in case they don't work as expected. 😄

Copy link
Contributor

@ZhongpinWang ZhongpinWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see refactoring needed for the big *_streamResponseChunks function. It would make the code more readable and testable.

@ZhongpinWang
Copy link
Contributor

I did another round of the review. Other than some naming issues, the biggest (potential) issue is that the multi choice logic is not implemented. This could be a problem in the future and since it is already properly handled in the vanilla orchestration client, consider implement this correctly in LangChain as well.

I want to hold off on implementing for multi-choice now as the orchestration service currently does not support multiple choices for streaming, it throws a 400. Moreover, in LangChain, each streamed AIMessageChunk is supposed to hold contents of only one choice, so there would be multiple chunks for one streamed chunk of the orchestration service. From what I understood, the consumer is supposed to group chunks together based on newTokenIndices.completion value.

I would prefer to do this in a separate BLI, when orchestration service starts supporting this and not in this PR.

It is normal to have multiple AIMessageChunk generated from one LLM chunk in the response. The AIMessageChunk is supposed to contain only one choice with index information.

I don't have strong opinion on whether we support n>2 already now as you are right, we get 400 from orchestration for such case. If we implement this already, the effort is not much by adding a foreach and not using the default 0 everywhere. Leave it to you to decide.

@KavithaSiva
Copy link
Contributor Author

KavithaSiva commented May 20, 2025

I don't have strong opinion on whether we support n>2 already now as you are right, we get 400 from orchestration for such case. If we implement this already, the effort is not much by adding a foreach and not using the default 0 everywhere. Leave it to you to decide.

I talked with Christoph and he said multiple choices for orchestration streaming is not planned in the near future(Q2 or Q3)
You are right, the code change is easy, but testing it would imply adding imaginary responses for now.
I would feel more comfortable adding this in when the service supports it too.

Copy link
Contributor

@ZhongpinWang ZhongpinWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed some tests. Please pay attention to the code quality. If logic can be simplified in an equivalent form always do that. The test scope should also be narrowed to what is needed. Using, e.g., resilience for testing all streaming logic is unnecessary.

Comment on lines 167 to 168
let iterations = 0;
const maxIterations = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[req] Why adding this complicated logic? Is there really a big performance change in test which we normally also don't care since it is just a test unless it really takes many seconds more? Also iterating through all the chunks may also covers some other edge cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea was to not compare the entire streamed text as it's big.
But, I don't have a strong opinion, so will switch to the entire text instead.

Comment on lines 173 to 175
intermediateChunk = !intermediateChunk
? chunk
: intermediateChunk.concat(chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[req] Just concat all chunk content and compare output for the sake of code simplicity.

Also I generally have the feeling that sometimes the code may work, just the logic is not clean. Is the above code not equivalent to

intermediateChunk = intermediateChunk
        ? intermediateChunk.concat(chunk)
        : chunk;

And are we not just checking the first chunk with content? Can't we check the content instead of using the maxIteration logic? (But still just loop over all chunks and compare to snapshot)

@@ -129,4 +158,93 @@ describe('orchestration service client', () => {
'Request failed with status code 400'
);
}, 1000);

it('supports streaming responses', async () => {
mockStreamInferenceWithResilience();
Copy link
Contributor

@ZhongpinWang ZhongpinWang May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[req] Why are we mocking responses with resilience? Why is resilience needed in this and other streaming test? Can we not just call mockInference()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, maybe having one test is enough (so we're aware of the behaviour)

Comment on lines 189 to 199
const client = new OrchestrationClient(config);
const controller = new AbortController();
const { signal } = controller;
const stream = await client.stream([], { signal });
const streamPromise = async () => {
for await (const _chunk of stream) {
controller.abort();
}
};

await expect(streamPromise()).rejects.toThrow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[pp] Check also the throw content.
[pp] Rename the streamPromise, it is a function not a promise.


let finalOutput: AIMessageChunk | undefined;
for await (const chunk of stream) {
finalOutput = !finalOutput ? chunk : finalOutput.concat(chunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
finalOutput = !finalOutput ? chunk : finalOutput.concat(chunk);
finalOutput = finalOutput ? finalOutput.concat(chunk) : chunk;

Copy link
Contributor

@ZhongpinWang ZhongpinWang May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But again, just concat tool call json string together and compare that. We iterate through all chunks anyway. It worths the effort trying to get content from each chunk and make sure it always works for all chunks.

Copy link
Contributor

@deekshas8 deekshas8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still reviewing, but this is how far I got today. Will take another look again

@@ -129,4 +158,93 @@ describe('orchestration service client', () => {
'Request failed with status code 400'
);
}, 1000);

it('supports streaming responses', async () => {
mockStreamInferenceWithResilience();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, maybe having one test is enough (so we're aware of the behaviour)

// First chunk content is empty
expect(firstCallArgs[0]).toEqual('');
// Second argument should be the token indices
expect(firstCallArgs[1]).toEqual({ prompt: 0, completion: 0 });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] I do not understand this honestly. The mock handler function here doesnt seem to take any arguments. Where do they come from?
[q] Why check : expect(chunks.length).toBeGreaterThan(0); Won't it just be one? What is the point of checking this value even wrt callbacks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First question: Even though the call back function, doesn't take arguments here, when the following snippet is executed in the _streamResponseChunks function, these arguments gets passed to the callback for each chunk.

await runManager?.handleLLMNewToken(
 content,
 tokenIndices,
 undefined,
 undefined,
 undefined,
 { chunk: generationChunk }
);

Second question: You are right, we don't need to check this value here.

generationInfo: { ...tokenIndices }
});

// Notify the run manager about the new token, some parameters are undefined as they are implicitly read from the context.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[req] Can we document such stuff for future? I wouldn't know why we set some values as undefined, unless I read the langchain code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, where would you want to document it? It's not something useful for the user, it's something that we should probably remember while enabling other LangChain modules.

* @internal
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export function computeTokenIndices(chunk: OrchestrationStreamChunkResponse): {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Didnt get the point of this function if its not supported in entirety. How is this helpful right now?

Copy link
Contributor Author

@KavithaSiva KavithaSiva May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token indices need to be set in each generated chunk as this provides information about which prompt(in a multi-prompt scenario) and which choice(in a muti-choice scenario) the chunk belongs to.

Although, we hardcode these values now, leaving the function as-is enables us to support mutli-chat scenarios in the future easily.
I would also like to have separate util functions for calculating everything that is need in the chunk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, is this in a ticket somewhere? I would add a TODO in the comments, but also mention it in a ticket for future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants