Using a WebSocket connection, your voice agent can interact in real-time with your custom LLM, enabling dynamic and responsive conversational capabilities based on your specific requirements.

Sample Code

A sample code using FastAPI websocket and asyncio. Make sure to use the AsyncOpenAI client to prevent OpenAI from blocking the asyncio thread during token streaming. This ensures messages are sent immediately via websocket.

from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.websocket("/llm-websocket")
async def websocket_handler(websocket: WebSocket):
    await websocket.accept()

    current_stream_id = -1

    async def stream_response(request):
        try:
            async for event in draft_response(request):
                await websocket.send_text(json.dumps(
                    {
                        "type": "stream_response",
                        "data": event
                    }
                ))
                if request['stream_id'] < current_stream_id:
                    return # Got new stream_request, abandon this task
        except:
            print(traceback.format_exc(), flush=True)
    try:
        while True:
            message = await websocket.receive_text()
            request = json.loads(message)
            if request["type"] == "start_call":
                current_stream_id = request["data"]['stream_id']
                first_event = {
                    "type": "stream_response",
                    "data": {
                        "stream_id": current_stream_id,
                        "content": "How can I help you?", # Agent's first message
                        "end_of_stream": True,
                    }
                }
                await websocket.send_text(json.dumps(first_event))
            elif request["type"] == "stream_request":
                current_stream_id = request["data"]['stream_id']
                asyncio.create_task(stream_response(request["data"]))
    except WebSocketDisconnect:
        ...

async def draft_response(self, request):
    prompt = self.prepare_prompt(request)
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=prompt,
        stream=True,
    )

    async for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            yield {
                "stream_id": request['stream_id'],
                "content": chunk.choices[0].delta.content,
                "end_of_stream": False,
            }

    yield {
        "stream_id": request['stream_id'],
        "content": "",
        "end_of_stream": True,
    }