Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Feature Proposal: Assistant API - Chaining streams for function execution #1261

Closed
1 task done
pi-infected opened this issue Mar 22, 2024 · 2 comments
Closed
1 task done

Comments

@pi-infected
Copy link

pi-infected commented Mar 22, 2024

Confirm this is a feature request for the Python library and not the underlying OpenAI API.

  • This is a feature request for the Python library

Describe the feature or improvement you're requesting

Hi,

I have developed a monkey patch to add the capacity for chaining streams which is very beneficial for the Assistant API function execution workflow. I think it could be integrated into the openai library. So, I guess you want to know the use case, right?

Imagine you are processing the assistant events in a loop (in my case I use the Async stream client but it's basically the almost same for the non-async streaming one):

async for chunk in assistant_stream_response:
    # Process chunk here
    
    
    # Process function calls
    if isinstance(chunk, ThreadRunRequiresAction):
        tool_outputs = # Execute the function and gather the outputs in this var     
    
        new_stream = await async_client.beta.threads.runs.submit_tool_outputs(
            thread_id=thread_id, # stored along the way
            run_id=chunk.data.id,
            tool_outputs=tool_outputs,
            stream=True
        )
        # we can chain the new_stream at the end of the current one to avoid writing another chunk processing loop
        assistant_stream_response.chain_stream(new_stream)


    yield result

With this, we can chain the tool submit stream response to the current one to avoid writing another chunk processing loop.
Tested & working.

It very beneficial, especially when you integrate the assistant API inside a project to avoid changing the existing workflow.
Here is the monkey patch:

#--------------------------------------MONKEY-PATCH-OPENAI--------------------------------------------------------------
import openai
from typing import Any, TypeVar, AsyncIterator, cast
from openai._utils import is_mapping
from openai._exceptions import APIError
from openai import AsyncOpenAI
import httpx

_T = TypeVar("_T")


def monkey_patch__init__(self, *, cast_to: type[_T], response: httpx.Response, client: AsyncOpenAI) -> None:
  self.response = response
  self._cast_to = cast_to
  self._client = client
  self._decoder = client._make_sse_decoder()
  self._iterator = self.__stream__()
  self._chained_stream = None # MOD HERE 

def chain_stream(self, stream): # NEW FUNCT HERE
  if self._chained_stream:
    self._chained_stream.chain_stream(stream)
  else:
    self._chained_stream = stream

async def monkey_patch__stream__(self) -> AsyncIterator[_T]:
  cast_to = cast(Any, self._cast_to)
  response = self.response
  process_data = self._client._process_response_data
  iterator = self._iter_events()

  async for sse in iterator:
    if sse.data.startswith("[DONE]"):
      break

    if sse.event is None:
      data = sse.json()
      if is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data=data, cast_to=cast_to, response=response)

    else:
      data = sse.json()

      if sse.event == "error" and is_mapping(data) and data.get("error"):
        message = None
        error = data.get("error")
        if is_mapping(error):
          message = error.get("message")
        if not message or not isinstance(message, str):
          message = "An error occurred during streaming"

        raise APIError(
          message=message,
          request=self.response.request,
          body=data["error"],
        )

      yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response)

  async for _sse in iterator:
    ...

  if self._chained_stream: # MOD HERE
    async for chunk in self._chained_stream:
      yield chunk


openai.AsyncStream.__init__ = monkey_patch__init__
openai.AsyncStream.__stream__ = monkey_patch__stream__
openai.AsyncStream.chain_stream = chain_stream
#-----------------------------------------------------------------------------------------------------------------------

Best regards,
Paul Irolla

Additional context

I have implemented this inside my personal fork of LiteLLM for integrating the assistant API into the existing workflow without changing a thousand of code lines.

@pi-infected pi-infected changed the title Assistant API : Chaining streams for function execution New Feature Proposal: Assistant API - Chaining streams for function execution Mar 22, 2024
@hayescode
Copy link

great workaround! Something like this is necessary since there could be an arbitrary number of tool_calls.

@rattrayalex
Copy link
Collaborator

Thanks for the suggestion! We have another, similar proposal in the works – stay tuned :)

@rattrayalex rattrayalex closed this as not planned Won't fix, can't repro, duplicate, stale May 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants