Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to close a stream using chat_engine.stream_chat #13503

Open
1 task done
cxycxm opened this issue May 15, 2024 · 10 comments
Open
1 task done

How to close a stream using chat_engine.stream_chat #13503

cxycxm opened this issue May 15, 2024 · 10 comments
Labels
question Further information is requested

Comments

@cxycxm
Copy link

cxycxm commented May 15, 2024

Question Validation

  • I have searched both the documentation and discord for an answer.

Question

How to close a stream using chat_engine.stream_chat

@cxycxm cxycxm added the question Further information is requested label May 15, 2024
Copy link

dosubot bot commented May 15, 2024

Hey @cxycxm! Nice to see you back 😊. Diving into your stream closing conundrum right now, expect an update in a flash!

@logan-markewich
Copy link
Collaborator

I actually don't think there is a way besides either exhausting the stream, or never continuing the stream

@cxycxm
Copy link
Author

cxycxm commented May 15, 2024

Can the client connection of this chat_engine be actively disconnected, similar to httpclient.close()

@logan-markewich
Copy link
Collaborator

Maybe? I think it really depends on the llm being used too. You'd have to do some digging into api docs there

Copy link

dosubot bot commented May 15, 2024

To actively disconnect or manage the lifecycle of a stream in chat_engine.stream_chat, you can utilize the reset method in the SimpleChatEngine class for resetting the chat engine's memory, which indirectly affects the stream by preparing it for a new session. This method doesn't directly close a stream in the conventional sense (like httpclient.close()), but it's a way to clear the session's state.

For a more direct approach to managing stream connections, especially for starting and stopping streams, the StreamingGeneratorCallbackHandler class within llama_index/core/langchain_helpers/streaming.py provides a structured mechanism. This class uses an event-driven approach with a Queue for tokens and an Event to signal the end of the stream, allowing for graceful handling of stream lifecycle events.

Events such as StreamChatStartEvent and StreamChatEndEvent are also crucial for lifecycle management, signaling the beginning and end of a chat stream session. While these events don't execute disconnection logic themselves, they provide a framework for implementing such functionality, allowing for custom logic to be executed at the start or end of a chat session.

In summary, while there's no explicit close method for streams in chat_engine.stream_chat, using the reset method, along with lifecycle management techniques provided by StreamingGeneratorCallbackHandler and stream lifecycle events, can serve to manage stream connections effectively.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@RussellLuo
Copy link
Contributor

RussellLuo commented May 15, 2024

Maybe we can add a close method to StreamingAgentChatResponse by leveraging generator.close(), for example:

class StreamingAgentChatResponse:
    ...

    def close(self) -> None:
        if self.chat_stream is not None:
            # Here we assume that `self.chat_stream` is a generator,
            # so this will trigger a GeneratorExit in the generator function.
            self.chat_stream.close()

Then we also need to handle GeneratorExit in the relevant llm's stream_chat() method. Take OpenAI for example, we might update its _stream_chat() as below:

def _stream_chat(
    self, messages: Sequence[ChatMessage], **kwargs: Any
) -> ChatResponseGen:
    ...

    def gen() -> ChatResponseGen:
        ...

        stream = client.chat.completions.create(
            messages=message_dicts,
            stream=True,
            **self._get_model_kwargs(**kwargs),
        )
        for response in stream:
            try:
                ...

                yield ChatResponse(
                    message=ChatMessage(
                        role=role,
                        content=content,
                        additional_kwargs=additional_kwargs,
                    ),
                    delta=content_delta,
                    raw=response,
                    additional_kwargs=self._get_response_token_counts(response),
                )
            except GeneratorExit:
                # Interrupt the stream by closing the connection.
                # see https://github.com/openai/openai-python/issues/969#issuecomment-1857158754
                stream.response.close()

                # Then exit gracefully.
                return

    return gen()

@logan-markewich
Copy link
Collaborator

This would need to be implemented for every streaming llm 😅

@logan-markewich
Copy link
Collaborator

But, makes sense

@RussellLuo
Copy link
Contributor

This would need to be implemented for every streaming llm 😅

Yes, both the sync and async versions need to be modified. Perhaps we can leverage some decorators, although they may not be helpful in all cases.

@cxycxm
Copy link
Author

cxycxm commented May 15, 2024

thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants