Using a WebSocket connection, your voice agent can interact in real-time with your custom LLM, enabling dynamic and responsive conversational capabilities based on your specific requirements.
Sample Code
A sample code using FastAPI websocket and asyncio. Make sure to use the AsyncOpenAI
client to prevent OpenAI from blocking the asyncio thread during token streaming. This ensures messages are sent immediately via websocket.
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
@app.websocket("/llm-websocket")
async def websocket_handler(websocket: WebSocket):
await websocket.accept()
current_stream_id = -1
async def stream_response(request):
try:
async for event in draft_response(request):
await websocket.send_text(json.dumps(
{
"type": "stream_response",
"data": event
}
))
if request['stream_id'] < current_stream_id:
return
except:
print(traceback.format_exc(), flush=True)
try:
while True:
message = await websocket.receive_text()
request = json.loads(message)
if request["type"] == "start_call":
current_stream_id = request["data"]['stream_id']
first_event = {
"type": "stream_response",
"data": {
"stream_id": current_stream_id,
"content": "How can I help you?",
"end_of_stream": True,
}
}
await websocket.send_text(json.dumps(first_event))
elif request["type"] == "stream_request":
current_stream_id = request["data"]['stream_id']
asyncio.create_task(stream_response(request["data"]))
except WebSocketDisconnect:
...
async def draft_response(self, request):
prompt = self.prepare_prompt(request)
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=prompt,
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield {
"stream_id": request['stream_id'],
"content": chunk.choices[0].delta.content,
"end_of_stream": False,
}
yield {
"stream_id": request['stream_id'],
"content": "",
"end_of_stream": True,
}