lanarky: bug: Expected response header Content-Type to contain 'text/event-stream', got 'text/plain'

Scenario

While running following client, httpx throws the exception denoting that Content-Type is ‘text/plain’ instead of ‘text/event-stream’

import json

import click

from lanarky.clients import StreamingClient


@click.command()
@click.option("--input", required=True)
@click.option("--stream", is_flag=True)
def main(input: str, stream: bool):
    client = StreamingClient("http://localhost:8001")
    for event in client.stream_response(
        method="POST",
        path="/chat?session_id=123",
        json={"input": input},
    ):
        print(f"{event.event}: {json.loads(event.data)['token']}", end="", flush=True)


if __name__ == "__main__":
    main()

The following versions and dependencies are used:

  • python: 3.11
  • langchain: 0.1.12
  • lanarky: 0.8.5
  • fastapi: 0.110.0
  • httpx: 0.27.0
  • httpx-sse: 0.4.0

Actual result

httpx_sse._exceptions.SSEError: Expected response header Content-Type to contain 'text/event-stream', got 'text/plain'

Expected result

The client should be able to send the streaming request properly

Acceptance criteria

  • [ ]

Contribute

  • Yes, I can fix this bug and open a PR

About this issue

  • Original URL
  • State: open
  • Created 4 months ago
  • Comments: 31 (10 by maintainers)

Most upvoted comments

@ajndkr I tried that too (the following code)

from typing import Annotated, Any

from fastapi import Depends, Query, Body
from kink import di
from lanarky.adapters.langchain.callbacks import TokenStreamingCallbackHandler, get_token_data
from lanarky.adapters.langchain.responses import StreamingResponse, ChainRunMode
from lanarky.adapters.langchain.routing import LangchainAPIRouter
from lanarky.events import Events

from agent.session.agent import SessionAgent

router = LangchainAPIRouter()


@router.post(path="/ask",
             name="Chat Endpoint",
             description="The main endpoint to ask a question to the foundation model",
             summary="Endpoint to ask question",
             tags=["chat", "ask", "chatbot"])
async def ask(session_id: Annotated[str, Query(title="The unique ID of session", min_length=1)],
                        question: Annotated[str, Body(title="The input question", min_length=1)],
                        session_agent: SessionAgent = Depends(lambda: di[SessionAgent])):
    chatbot = session_agent.chatbot(session_id)
    return StreamingResponse(chain=chatbot.chain,
                             run_mode=ChainRunMode.SYNC,
                             config={
                                 "inputs": {"question": question},
                                 "callbacks": [VertexStreamingCallbackHandler(output_key="text")]
                             })


class VertexStreamingCallbackHandler(TokenStreamingCallbackHandler):

    async def on_chain_end(
            self, outputs: dict[str, Any], **kwargs: dict[str, Any]) -> None:
        if self.llm_cache_used or not self.streaming:
            if self.output_key in outputs:
                message = self._construct_message(
                    data=get_token_data(outputs[self.output_key], self.mode),
                    event=Events.COMPLETION)
                await self.send(message)

it also results in unfinished chain which compels the client to keep on pinging continuously.

@ajndkr Thanks a lot for your continuous assistance! Looking forward to your further analysis 👍

@ajndkr Thanks a lot for your quick response. I strongly believe that the main issue lies in Attempted to access streaming response content, without having called read(). and we have to figure out what needs to be done there.