langchainjs: Streaming does not work if RemoteRunnable is part of RunnableSequence
Checked other resources
- I added a very descriptive title to this issue.
- I searched the LangChain.js documentation with the integrated search.
- I used the GitHub search to find a similar question and didn’t find it.
- I am sure that this is a bug in LangChain.js rather than my code.
- The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).
Example Code
Langserve endpoint
from fastapi import FastAPI
from langserve import add_routes
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
OPENAI_API_KEY= "your API key"
add_routes(app, ChatOpenAI(model="gpt-3.5-turbo-1106", api_key=OPENAI_API_KEY), path='/models/ChatOpenAI')
if __name__ == "__main__":
import os
import uvicorn
HTTP_PORT = 8081
uvicorn.run(app, host="0.0.0.0", port=HTTP_PORT)
Python’s RemoteRunnable streams correctly
from langserve.client import RemoteRunnable;
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = RemoteRunnable("http://localhost:8081/models/ChatOpenAI")
chain = prompt.pipe(model)
stream = chain.stream({'topic': 'cats'})
for chunk in stream:
print(chunk)
JS equivalent does not stream and just calls invoke
import { ChatPromptTemplate } from '@langchain/core/prompts';
import { RemoteRunnable } from "langchain/runnables/remote";
const prompt = ChatPromptTemplate.fromTemplate("Tell me a joke about {topic}")
const model = new RemoteRunnable({ url: `http://localhost:8081/models/ChatOpenAI` });
const chain = RunnableSequence.from([prompt, model]);
const stream = chain.stream({topic: "cats"});
for await (const chunk of stream) {
console.log(chunk);
}
Solo RemoteRunnable in JS streams properly
When we simplify the chain and just call .stream directly on RemoteRunnable, streaming seems to be working correctly:
import { RemoteRunnable } from "langchain/runnables/remote";
const chain = new RemoteRunnable({ url: `http://localhost:8081/models/ChatOpenAI` });
const stream = chain.stream({topic: "cats"});
for await (const chunk of stream) {
console.log(chunk);
}
Error Message and Stack Trace (if applicable)
No response
Description
I’m trying to use RemoteRunnable in place of a model in my javascript LCEL chains but when i compose the RemoteRunnable with any other runnable to form a RunnableSequence, it does not stream and fallbacks to invoke.
Equivalent code works in python version.
System Info
pnpm show langchain
❯ pnpm show langchain
langchain@0.1.28 | MIT | deps: 18 | versions: 259
Typescript bindings for langchain
https://github.com/langchain-ai/langchainjs/tree/main/langchain/
keywords: llm, ai, gpt3, chain, prompt, prompt engineering, chatgpt, machine learning, ml, openai, embeddings, vectorstores
dist
.tarball: https://registry.npmjs.org/langchain/-/langchain-0.1.28.tgz
.shasum: fd07a9880a68865e5c803dc01decdaa097b7c756
.integrity: sha512-LNuILtOSFptfOqDwUwQAl7B0dYrBtxzVZ6lDhc5DA8rk+pyTCy12HQO7t8bGqC8oQlEPYPKaom8VRlvfupUBTw==
.unpackedSize: 4.1 MB
dependencies:
@anthropic-ai/sdk: ^0.9.1 expr-eval: ^2.0.2 langsmith: ~0.1.7 yaml: ^2.2.1
@langchain/community: ~0.0.36 js-tiktoken: ^1.0.7 ml-distance: ^4.0.0 zod-to-json-schema: ^3.22.3
@langchain/core: ~0.1.44 js-yaml: ^4.1.0 openapi-types: ^12.1.3 zod: ^3.22.4
@langchain/openai: ~0.0.19 jsonpointer: ^5.0.1 p-retry: 4
binary-extensions: ^2.2.0 langchainhub: ~0.0.8 uuid: ^9.0.0
maintainers:
- nfcampos <nuno@boringbits.io>
- jacoblee93 <jacoblee93@gmail.com>
- davidduong <david@duong.cz>
- hwchase17 <hw.chase.17@gmail.com>
- basproul <braceasproul@gmail.com>
dist-tags:
latest: 0.1.28 next: 0.1.29-rc.0
published 3 days ago by jacoblee93 <jacoblee93@gmail.com>
Node version
❯ node --version
v21.5.0
pnpm version
❯ pnpm --version
8.14.1
About this issue
- Original URL
- State: closed
- Created 3 months ago
- Reactions: 1
- Comments: 15
The
_streamIteratorasync generator function in theRunnableSequenceclass is designed to provide a default streaming implementation for runnables. This function takes an input and optional call options, invokes the runnable with these parameters, and yields the result. This is a simplistic implementation meant to be overridden by subclasses that support more sophisticated streaming output. The purpose of this function is to allow for the streaming of outputs, potentially in real-time or in chunks, rather than waiting for all processing to complete before returning a result. This can be particularly useful for long-running tasks or those that generate incremental outputs.The reason why
_streamIteratormight be invoking the last step instead of streaming it when aRemoteRunnableis part of aRunnableSequencecould be related to how theRunnableSequenceclass’sinvokemethod is implemented. Theinvokemethod processes each step in the sequence sequentially, starting with the first step, then any middle steps, and finally the last step. Each step’s output is used as the input for the next step. The final step’s output is then returned as the final output of the sequence.When a
RemoteRunnableis included as part of aRunnableSequence, and the sequence is streamed using_streamIterator, the streaming behavior is determined by the default implementation of_streamIterator, which simply invokes the runnable (in this case, the entire sequence) once and yields the result. This means that the streaming functionality does not inherently break down the sequence into its constituent steps for individual streaming. Instead, it treats the sequence as a single runnable entity. Therefore, the streaming operation waits for the entire sequence to complete, including the invocation of the last step, before yielding the final result. This behavior effectively bypasses the potential for streaming intermediate outputs from each step within the sequence.To achieve streaming of each step within a
RunnableSequence, including when aRemoteRunnableis involved, theRunnableSequenceclass or theRemoteRunnableclass would need to implement a more sophisticated streaming mechanism. This could involve overriding the_streamIteratormethod to handle the streaming of outputs from each step within the sequence individually, rather than treating the sequence as a single invocation. However, based on the provided context, such an implementation is not present, leading to the observed behavior where the last step is invoked and its result is streamed as a single output, rather than streaming the outputs of each step incrementally.For more details, you can refer to the source code of the
RunnableSequenceclass and theRemoteRunnableclass in the LangChain JS codebase:I hope this explanation helps! If you have any further questions, feel free to ask.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Based on the code and the issue you’ve described, it seems like the problem lies in how
RunnableSequencehandles streaming. In the current implementation of LangChain.js,RunnableSequenceand similar constructs likeRunnableRetry,RunnableWithFallbacks, etc., are designed to handle sequences of operations, potentially modifying the behavior of individualRunnableinstances through decoration. These constructs focus on invoking runnables in a specific order or under specific conditions, primarily through theinvokemethod, and optionally through thebatchmethod for handling multiple inputs at once.The
streammethod implementation provided in the base class (from which other runnables presumably inherit) is designed to work with individualRunnableinstances. It leverages an asynchronous generator (_streamIterator) to yield results as they become available. This mechanism is inherently suited for operations that can produce output incrementally before the entire input has been processed, which is a typical scenario in streaming.However, when a
RemoteRunnableis incorporated into aRunnableSequence, the sequence’sinvokemethod orchestrates the execution flow across multiple runnables, potentially altering inputs and outputs along the way. This orchestration does not inherently support streaming because it focuses on completing each step fully before moving to the next. The sequence’s design does not account for the possibility of incrementally yielding results from intermediate steps, which is essential for streaming functionality.Moreover, the
RunnableSequenceand similar constructs do not override or provide their own implementations of thestreammethod that would allow them to handle streaming appropriately across all constituent runnables. Without such an implementation, attempting to stream through aRunnableSequencewould default to the base class’s streaming behavior, which is not designed to handle the complexities of a sequence of operations.In summary, the core issue is that the
RunnableSequenceand related constructs are not designed with streaming in mind, focusing instead on sequential or conditional execution of runnables. To enable streaming functionality whenRemoteRunnableis part of aRunnableSequence, it would be necessary to implement a customstreammethod forRunnableSequencethat orchestrates streaming across all constituent runnables, respecting the incremental nature of streaming and ensuring that intermediate results can be yielded as they become available.You can find more details about the current implementation of
RunnableSequenceandRemoteRunnablein the LangChain.js codebase here and here respectively.I hope this explanation helps clarify the issue. If you have any further questions or need more clarification, please don’t hesitate to ask.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Yes, the
_streamIteratorasync generator function in theRunnableSequenceclass of the LangChain.js framework does support streaming of the last step. This is evident from the way the function is implemented.The function first constructs the
finalGeneratorby transforming each step one by one, starting from the first step. This is done inside the loop:After the loop, the
finalGeneratoris fully assembled, including the transformation of the last step. The function then starts to yield chunks from thefinalGenerator:This means that the
finalGeneratorwill yield chunks of data as they are produced by the last step. Therefore, the_streamIteratorfunction does support streaming of the last step.Regarding the
getCallbackManagerForConfigfunction, without additional context, it’s hard to provide a detailed explanation. However, based on its usage in the code snippet you provided, it seems to be used for managing callbacks related to the execution of the sequence. TherunManagerobtained fromgetCallbackManagerForConfigis used to handle the start and end of the sequence execution, as well as any errors that occur during the execution.I hope this helps! If you have any more questions, feel free to ask.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
The
RunnableSequenceclass in LangChain.js is designed to execute each step in the sequence synchronously, waiting for each step to complete before moving to the next. This design does not inherently support streaming, which requires a continuous flow of data that can be processed and passed along as it becomes available.To modify the
RunnableSequenceclass to support streaming from the last runnable, you would need to implement a mechanism that allows the last runnable to start producing output as soon as it has enough input to do so, without waiting for all previous runnables to complete their execution. This could involve:Overriding the
streammethod in theRunnableSequenceclass: You would need to create a custom implementation of thestreammethod that specifically handles the streaming capability of the last runnable. This method would need to orchestrate the sequence such that when it reaches the last runnable, it switches from a sequential execution model to a streaming model.Integrating streaming support into the sequence execution logic: This would involve modifying the sequence execution logic in the
invokemethod to detect when it’s executing the last runnable and, instead of simply awaiting its completion, setting up a streaming process. This could be achieved by checking if the last runnable implements a specific streaming interface or method and, if so, calling that method to initiate streaming.Handling intermediate outputs: For the last runnable to start streaming effectively, it may need partial outputs from the previous runnables. This would require a mechanism to pass these intermediate outputs to the last runnable as soon as they are available, rather than waiting for the entire sequence to execute. This could involve modifying the execution logic of the sequence to support partial outputs and passing them to the next runnable in the sequence.
Here is a conceptual example of how you might start to implement these changes:
This example outlines a basic approach to modifying the
RunnableSequenceclass to support streaming from the last runnable. It involves executing all but the last runnable in the sequence synchronously, then checking if the last runnable supports streaming and, if so, initiating the stream. This approach assumes that the last runnable has astreammethod implemented that can handle streaming output. Further modifications and error handling would likely be necessary to fully integrate this functionality into the LangChain.js framework.Please note that this is a conceptual example and might not work out of the box. You might need to adjust it according to your specific use case and the exact implementation of the
RunnableSequenceandRemoteRunnableclasses in your version of LangChain.js.Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Thank you for providing the detailed information. It seems like you’re encountering an issue with the
RemoteRunnablewhen it’s part of aRunnableSequencein LangChain.js. I’ll try to provide some insights based on your questions.Firstly, regarding the differences between Python and JavaScript in handling asynchronous execution and generator handling, you’re correct that these differences might be contributing to the issue you’re facing. In JavaScript, the
RemoteRunnableclass handles asynchronous execution primarily through itsinvoke,batch,stream, andstreamLogmethods, which are all asynchronous and return promises. This design allows for non-blocking execution and is a common pattern in JavaScript for handling asynchronous operations, such as network requests. The class makes extensive use ofasync/awaitsyntax for managing asynchronous code in a more synchronous-looking manner, which simplifies the handling of asynchronous operations like HTTP requests.For generator handling, the
streamLogmethod is an asynchronous generator function, indicated by theasyncand*keywords. This method leverages thefor await...ofloop to asynchronously iterate over a stream of log patches. Asynchronous generators in JavaScript are used to handle streams of data that arrive over time, allowing the function to yield a new value withyieldas soon as it becomes available, making it well-suited for streaming data scenarios like log streams.Compared to Python, JavaScript’s approach to asynchronous execution and generator handling is quite different due to the language’s inherent design and its single-threaded, event-driven nature. Python uses
asynciofor asynchronous programming, which also involves an event loop but is structured differently, often usingasync deffor asynchronous functions andawaitfor waiting on their results. Python’s generator functions use theyieldkeyword, similar to JavaScript, but the handling of asynchronous generators in Python is distinct and involves theasync forsyntax to iterate over asynchronous generator objects.Secondly, regarding the
RunnableSequenceclass in LangChain.js, it handles the execution of a sequence ofRunnableobjects by sequentially invoking eachRunnablein the sequence with the output of the previousRunnableas its input. This process starts with the firstRunnablein the sequence, continues through any middleRunnableobjects, and ends with the lastRunnable. The class supports modifying the behavior of the sequence through methods likewithRetry,withConfig, andwithFallbacks, allowing for retries, configuration binding, and fallback execution, respectively.For a
RemoteRunnable, or anyRunnablein the sequence, the handling mechanism does not change. EachRunnableis invoked with the input it receives, which could be the original input for the firstRunnableor the output from the previousRunnablein the sequence. TheRunnableSequenceclass ensures that the execution flow is maintained, and any specific behaviors like retries or fallbacks are applied based on the configurations provided.Unfortunately, I wasn’t able to find specific discussions or documentation that explain the design decisions or problems related to this issue in LangChain.js. Also, I couldn’t find any known issues or limitations with the
RemoteRunnableorRunnableSequenceclasses in the LangChain.js codebase that could be causing this issue. It’s possible that the issue you’re facing is due to some other factors or a bug in the codebase.I hope this information helps you understand the potential differences and behaviors of the
RemoteRunnableandRunnableSequenceclasses in LangChain.js. If you have more information or if there’s anything else you’d like to know, please feel free to share.Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Based on the code you’ve provided and the context of your question, it seems like the issue you’re encountering is due to how
RunnableSequencehandles the execution of each runnable in the sequence. In the current implementation, each runnable in the sequence is processed synchronously, meaning that even if the last runnable in the sequence (in this case,RemoteRunnable) supports streaming, the sequence itself does not inherently support streaming the output of that last runnable. The results are being collected and returned only after the entire sequence has been processed.This behavior is different from the Python version due to the nuances of asynchronous execution and generator handling in the two languages. Python’s
asyncioand generator capabilities might be leveraged in such a way that allows the equivalent Python code to stream outputs from a sequence more seamlessly.To address this issue in the TypeScript implementation, modifications would need to be made to the
RunnableSequenceclass to support streaming from the last runnable if it supports streaming. This could involve checking if the last runnable in the sequence supports streaming and, if so, handling its output differently by yielding results as they become available instead of waiting for all runnables in the sequence to complete.However, without specific details on the implementation of
RemoteRunnableand how it is expected to integrate with streaming in the sequence, it’s challenging to provide a precise solution. It’s also important to note that such changes could have significant implications on the overall behavior of theRunnableSequenceclass and would need to be thoroughly tested to ensure they don’t introduce new issues.I hope this explanation helps clarify the issue. If you have further questions or need more details, please let me know.
Sources
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
@dosubot What could prevent the final step step from being streamed? Let’s say last step is RemoteRunnable which supports steaming. I would expect it to call /stream endpoint but it calls /invoke