Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Streaming with async_response_gen incompatible with FastAPI #13495

Open
JLongley opened this issue May 14, 2024 · 6 comments
Open

[Bug]: Streaming with async_response_gen incompatible with FastAPI #13495

JLongley opened this issue May 14, 2024 · 6 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@JLongley
Copy link

Bug Description

I have a very simple FastAPI endpoint set up to test out streaming tokens back from a context chat engine. As written, the first request correctly streams the content back, but every subsequent request gives me an asyncio error:

got Future <Future pending> attached to a different loop

The full stack trace is linked below.

Version

llama-index==0.10.36, fastapi==0.104.1

Steps to Reproduce

I'm running the above code in a docker container.

With that setup, I cURL http://localhost:8000/copilot/stream_test?message=Hello and get a streamed response. If I cURL the endpoint a second time, I get no response and the stack trace above is output by the server.

Here is my implementation:

@router.get("/stream_test")
async def stream_test(
    request: Request,
    message: str,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat(message, [])

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    return StreamingResponse(event_generator(), media_type="text/plain")
def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine

Relevant Logs/Tracbacks

Traceback (most recent call last):
   File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
     result = await app(  # type: ignore[func-returns-value]
   File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
     return await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/fastapi/applications.py", line 1106, in __call__
     await super().__call__(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 122, in __call__
     await self.middleware_stack(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
     raise exc
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
     await self.app(scope, receive, _send)
   File "/shared/common/web/backend/app/middleware/rate_limit.py", line 47, in __call__
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
     raise exc
   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
     await self.app(scope, receive, sender)
   File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
     raise e
   File "/usr/local/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 718, in __call__
     await route.handle(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 276, in handle
     await self.app(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 69, in app
     await response(scope, receive, send)
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 277, in __call__
     await wrap(partial(self.listen_for_disconnect, receive))
   File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
     raise exceptions[0]
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 256, in __step
     result = coro.send(None)
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 273, in wrap
     await func()
   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 262, in stream_response
     async for chunk in self.body_iterator:
   File "/shared/common/web/backend/app/endpoints/copilot/endpoint.py", line 340, in event_generator
     async for token in response.async_response_gen():
   File "/usr/local/lib/python3.9/site-packages/llama_index/core/chat_engine/types.py", line 277, in async_response_gen
     delta = await asyncio.wait_for(self.aqueue.get(), timeout=0.1)
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
     return fut.result()
   File "/usr/local/lib/python3.9/asyncio/futures.py", line 201, in result
     raise self._exception
   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 258, in __step
     result = coro.throw(exc)
   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
     await getter
   File "/usr/local/lib/python3.9/asyncio/futures.py", line 284, in __await__
     yield self  # This tells Task to wait for completion.
 RuntimeError: Task <Task pending name='Task-73' coro=<Queue.get() running at /usr/local/lib/python3.9/asyncio/queues.py:166> cb=[_release_waiter(<Future pendi...k.__wakeup()]>)() at /usr/local/lib/python3.9/asyncio/tasks.py:416]> got Future <Future pending> attached to a different loop
@JLongley JLongley added bug Something isn't working triage Issue needs to be triaged/prioritized labels May 14, 2024
Copy link

dosubot bot commented May 14, 2024

The error you're encountering, got Future <Future pending> attached to a different loop, typically happens when an asyncio future or task is created in one event loop and then awaited in another. This is a common issue when dealing with asynchronous operations in FastAPI, especially when integrating with external libraries or managing background tasks.

To address this issue, ensure that all asynchronous components and tasks are initialized and awaited within the same event loop context. This can be achieved by:

  • Using asyncio.get_event_loop() to get the current loop for creating tasks or futures, ensuring consistency across your application.
  • Initializing async components like asyncio.Queue within the async context of a request to ensure they're tied to the correct event loop.
  • Leveraging FastAPI's dependency injection system for managing shared resources across requests, ensuring they're properly initialized within the request's event loop context.
  • For background tasks that run outside the request-response cycle, use fastapi.BackgroundTasks to ensure they execute within the correct event loop context.

Make sure to update your libraries to the latest versions, as fixes in newer versions might resolve your issue. Additionally, consider adding logging to track event loop usage throughout your application, which can help identify where the loop mismatch occurs.

Regarding the LlamaIndex library and its integration with FastAPI, ensure that any asynchronous operations are correctly managed within FastAPI's event loop. This includes proper initialization and usage of async components within your endpoint implementations.

If the issue persists, reviewing the implementation of the SimpleChatEngine and its asynchronous methods might be necessary. Ensure that these methods are correctly integrated into your FastAPI application, with careful management of asyncio tasks and event loops to avoid conflicts.

This approach should help resolve the asyncio error you're facing by ensuring consistent event loop usage across your FastAPI application and its asynchronous operations.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.

@logan-markewich
Copy link
Collaborator

logan-markewich commented May 15, 2024

@JLongley can you try updating? v0.10.37 had some updates that maybe fixed this?

This script worked great for me on each request using the latest version of llama-index-core/llama-index

from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.chat_engine.types import BaseChatEngine


llm = OpenAI()

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse

app = FastAPI()


def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine


@app.get("/")
async def root():
    return {"message": "Hello World"}


@app.get("/stream_test")
async def stream_test(
    request: Request,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat("Tell me a poem about raining cats and dogs!")

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    return StreamingResponse(event_generator(), media_type="text/plain")    

if __name__ == "__main__":
  import uvicorn
  uvicorn.run(app, loop="asyncio")


@JLongley
Copy link
Author

Thanks Logan,

I've upgraded fastapi and llama-index both to the latest versions, but I still am seeing the same errors.

I notice that about 1 in every ~5 times the request will get through without an error, but interestingly, the response returns all at once in postman, not one token at a time.

llama-index==0.10.37
fastapi==0.111.0

from llama_index.llms.openai import OpenAI
from llama_index.core import VectorStoreIndex
from llama_index.core.chat_engine.types import BaseChatEngine


llm = OpenAI()

from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse

app = FastAPI()

import os

def get_chat_engine() -> BaseChatEngine:
    vector_store_index = VectorStoreIndex.from_documents(documents=[])

    chat_engine = vector_store_index.as_chat_engine(
        chat_mode="context",
        similarity_top_k=10,
        system_prompt="You are a helpful assistant",
    )

    return chat_engine


@app.get("/")
async def root():
    return {"message": "Hello World"}


@app.get("/stream_test")
async def stream_test(
    request: Request,
    chat_engine: BaseChatEngine = Depends(get_chat_engine),
):
    response = await chat_engine.astream_chat("Tell me a poem about raining cats and dogs!")

    # Generate a response.
    async def event_generator():
        async for token in response.async_response_gen():
            if await request.is_disconnected():
                break

            yield token

    # # Generate a response.
    # def event_generator():
    #     for token in response.response_gen:
    #         yield token

    return StreamingResponse(event_generator(), media_type="text/plain")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(
        app,
        loop="asyncio",
        host="0.0.0.0",
        port=8080,
    )

Error output:

ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 265, in __call__
    await wrap(partial(self.listen_for_disconnect, receive))
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 261, in wrap
    await func()
  File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 238, in listen_for_disconnect
    message = await receive()
  File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 597, in receive
    await self.message_event.wait()
  File "/usr/local/lib/python3.9/asyncio/locks.py", line 226, in wait
    await fut
asyncio.exceptions.CancelledError: Cancelled by cancel scope ffff92492bb0
During handling of the above exception, another exception occurred:
  + Exception Group Traceback (most recent call last):
  |   File "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 436, in run_asgi
  |     result = await app(  # type: ignore[func-returns-value]
  |   File "/usr/local/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
  |     return await self.app(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
  |     await super().__call__(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/applications.py", line 123, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 186, in __call__
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/errors.py", line 164, in __call__
  |     await self.app(scope, receive, _send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
  |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
  |     await app(scope, receive, sender)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 756, in __call__
  |     await self.middleware_stack(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 776, in app
  |     await route.handle(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 297, in handle
  |     await self.app(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 77, in app
  |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
  |     raise exc
  |   File "/usr/local/lib/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
  |     await app(scope, receive, sender)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/routing.py", line 75, in app
  |     await response(scope, receive, send)
  |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 265, in __call__
  |     await wrap(partial(self.listen_for_disconnect, receive))
  |   File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 678, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 261, in wrap
    |     await func()
    |   File "/usr/local/lib/python3.9/site-packages/starlette/responses.py", line 250, in stream_response
    |     async for chunk in self.body_iterator:
    |   File "/shared/common/main.py", line 41, in event_generator
    |     async for token in response.async_response_gen():
    |   File "/usr/local/lib/python3.9/site-packages/llama_index/core/chat_engine/types.py", line 277, in async_response_gen
    |     delta = await asyncio.wait_for(self.aqueue.get(), timeout=0.1)
    |   File "/usr/local/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
    |     return fut.result()
    |   File "/usr/local/lib/python3.9/asyncio/queues.py", line 166, in get
    |     await getter
    | RuntimeError: Task <Task pending name='Task-694' coro=<Queue.get() running at /usr/local/lib/python3.9/asyncio/queues.py:166> cb=[_release_waiter(<Future pendi...f9249ce20>()]>)() at /usr/local/lib/python3.9/asyncio/tasks.py:416]> got Future <Future pending> attached to a different loop
    +------------------------------------
    ```

@logan-markewich
Copy link
Collaborator

I was running in browser (lol), and the streaming seemed to work fine. Let me see if I can reproduce with the above code

@logan-markewich
Copy link
Collaborator

Seems like the queue maybe needs to be initialized each time to be in the current async loop?

@logan-markewich
Copy link
Collaborator

@JLongley Hmm, I still can't reproduce
image

image

Maybe try with a fresh venv to be sure? I copied your script above exactly and just launched, and then used postman this time. Zero requests failed 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

2 participants