Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

apiserver endpoint error en session.get_task_result_stream(task_id) with client #374

Open
jdelacasa opened this issue Nov 21, 2024 · 5 comments

Comments

@jdelacasa
Copy link

I'm doing something wrong but I can't figure it out. When I call from my fastapi to try to retrieve the streaming events, the apiserver output gives me a 404 Not found:

NFO: 10.244.2.40:57518 - "POST /sessions/create HTTP/1.1" 200 OK
INFO:llama_deploy.message_queues.base - Publishing message of type 'ag-rag-react-stream' with action 'ActionTypes.NEW_TASK' to topic 'llama_deploy.ag-rag-react-stream'
INFO: 127.0.0.1:55704 - "POST /publish/llama_deploy.ag-rag-react-stream HTTP/1.1" 200 OK
INFO: 10.244.2.40:57524 - "POST /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/tasks HTTP/1.1" 200 OK
INFO: 127.0.0.1:42934 - "POST /process_message HTTP/1.1" 200 OK
INFO:llama_deploy.message_queues.simple - Successfully published message 'ag-rag-react-stream' to consumer.
INFO: 10.244.2.40:57530 - "GET /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/tasks/821c981c-55bb-4240-9ada-7065bdd36613/result_stream HTTP/1.1" 404 Not Found
INFO: 127.0.0.1:60004 - "GET /sessions/5cfe06a0-375f-4387-a8fb-036e94957f85/state HTTP/1.1" 200 OK

example code (fastapi endpoint)

@workflow_router.post("/workflow/execute")
async def workflow_execute(request: Request):

(...)

client = llama_deploy.Client(api_server_url=apiserver_url,
                            control_plane_url=control_plane_url,
                            timeout=300
                            )
session = await client.core.sessions.create()
task_id = await session.run_nowait(service_name, **request_input)

async def stream_events():
            
    async for event in session.get_task_result_stream(task_id):
        if "progress" in event:
            data = {
                "choices": [
                    {
                        "delta": { "content": f"Chunk {event['progress']}" }
                    }
                ]
            }
            yield f"data: {json.dumps(data)}\n\n"
            logger.info(f"Progress: {event['progress']}")

return StreamingResponse(stream_events(), media_type="application/octet-stream")

=============
OUTPUT :

INFO: 10.244.1.184:57970 - "POST /workflow/execute HTTP/1.1" 200 OK
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
raise exc from None
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
response = await connection.handle_async_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
return await self._connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 143, in handle_async_request
raise exc
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 113, in handle_async_request
) = await self._receive_response_headers(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 186, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_async/http11.py", line 224, in _receive_event
data = await self._network_stream.read(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpcore/_backends/anyio.py", line 32, in read
with map_exceptions(exc_map):
File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ReadTimeout

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 401, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in call
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in call
await super().call(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 113, in call
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 187, in call
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 165, in call
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in call
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 62, in wrapped_app
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 51, in wrapped_app
await app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 715, in call
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 735, in app
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 288, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 76, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 62, in wrapped_app
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/_exception_handler.py", line 51, in wrapped_app
await app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 74, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 250, in call
async with anyio.create_task_group() as task_group:
File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in aexit
raise exceptions[0]
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 253, in wrap
await func()
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 242, in stream_response
async for chunk in self.body_iterator:
File "/maarifa/maarifa-api/routes/workflow.py", line 436, in stream_events
async for event in session.get_task_result_stream(task_id):
File "/usr/local/lib/python3.11/site-packages/llama_deploy/client/models/core.py", line 128, in get_task_result_stream
async with client.stream("GET", url) as response:
File "/usr/local/lib/python3.11/contextlib.py", line 210, in aenter
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1617, in stream
response = await self.send(
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1661, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1689, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1726, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_client.py", line 1763, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 372, in handle_async_request
with map_httpcore_exceptions():
File "/usr/local/lib/python3.11/contextlib.py", line 158, in exit
self.gen.throw(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ReadTimeout

@masci masci added this to Framework Nov 21, 2024
@logan-markewich
Copy link
Collaborator

@jdelacasa I think either

  1. the stream was not created yet and the code to get the stream ran too soon? You might need to retry this or have a small delay
  2. your workflow isn't streaming any events

@jdelacasa
Copy link
Author

jdelacasa commented Nov 21, 2024

it is used constantly in the workflow:

ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))

and the final result is given to me correctly when it finishes its execution:

async def stream_events():
            
    async for event in session.get_task_result_stream(task_id):
        if event is not None:
            #if "progress" in event:
            data = {
                "choices": [
                    {
                        "delta": { "content": f"Chunk {event}" }
                    }
                ]
            }
            yield f"data: {json.dumps(data)}\n\n"
            logger.info(f"Progress: {event['progress']}")



    try:
        while True:
            final_result = await session.get_task_result(task_id)
            if final_result is not None:
                
                
                data = {
                    "choices": [
                        {
                            "delta": { "content": f"Final Result: {final_result.result}" }
                        }
                    ]
                }
                yield f"data: {json.dumps(data)}\n\n"
                # # Finalizamos el stream
                data = {
                    "choices": [
                        {
                            "finish_reason": "stop"
                        }
                    ]
                }
                yield f"data: {json.dumps(data)}\n\n"
                logger.info(f"Final Result: {final_result}")
                break
            await asyncio.sleep(0.3)

    except Exception as e:
        logger.error(f"Error in result: {e}")
        yield f"data: {json.dumps({'error': str(e)})}\n\n"
    
    finally:
        await client.core.sessions.delete(session.id)

return StreamingResponse(stream_events(), media_type="application/octet-stream")

@jdelacasa
Copy link
Author

It seems that if I add a sleep before each ctx.write_event_to_stream the reception of events in stream works:

    await asyncio.sleep(0.3)
    ctx.write_event_to_stream(ProgressEvent(msg="ok")) 

@apost71
Copy link
Contributor

apost71 commented Feb 7, 2025

I've been running into this same issue and I don't think requiring a sleep is the right solution to this. It looks a new httpx.AysncClient is created and there is no way to configure the timeout of that client because the timeout provided to the higher level Client is not propagated through. It looks like the timeouts are only provided to the httpx requests when client.request is used. I was able to resolve my issues by updating this line to look like this:

 async with httpx.AsyncClient(timeout=self.client.timeout) as client:

Additionally, it may be nice to have the option to set more granular timeout options as defined in the httpx docs.

I suppose another option would be to check for read timeouts in addition to httpx.HTTPStatusError in the exception block but I think having more fine grained control of the timeouts would be better.

I'd be willing to put together a PR for this if that would be helpful.

@masci
Copy link
Member

masci commented Feb 7, 2025

@apost71 agree sleeping is always a red flag, we need to fix the root cause. Using httpx directly only happens when we need to stream the response, not sure if we can change self.client to support that use case, but we can also go with your change and pass the timeout value down to the client. Contributions welcome!
About a finer-grained timeout, we have a bunch of places where we expect float | None, if we can make it work with something like float | httpx.Timeout | None I'd have no objections to add support for it, just many users don't care about different timeouts and are more comfortable passing the number of seconds alone, so I'd like to keep it.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

4 participants