LLM Streaming Responses in Python
Streaming LLM responses transforms the user experience from "wait for a complete response" to "watch tokens appear in real time." Instead of waiting 3–5 seconds for a full response to generate and return, streaming delivers tokens as they are produced, allowing you to display text progressively. This makes AI-powered applications feel more responsive and interactive, mimicking real human conversation. Streaming also enables cancellation mid-response and allows frontend code to process tokens before the entire response completes.
How Streaming Works
Streaming uses server-sent events (SSE) to return an open HTTP connection over which the API sends tokens one at a time. The openai Python client handles the protocol; you iterate over the stream to process each token:
from openai import OpenAI
client = OpenAI()
# Create a streaming completion
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "Write a short poem about Python programming."}
],
stream=True # Enable streaming
)
# Process tokens as they arrive
for chunk in stream:
# Each chunk is a completion delta
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # Newline at the end
The key is setting stream=True. The API returns a generator that yields chunk objects. Each chunk contains a delta (the incremental content added since the last chunk). You iterate through chunks and print or process tokens as they arrive. The flush=True argument ensures tokens appear immediately on the screen without buffering.
Building a Streaming Chat Chatbot
Streaming shines in interactive chatbots. You collect user input, stream the response, and accumulate tokens into a complete message for history:
from openai import OpenAI
client = OpenAI()
messages = [
{
"role": "system",
"content": "You are a friendly Python tutor. Explain concepts clearly and concisely."
}
]
while True:
user_input = input("You: ")
if user_input.lower() in ["quit", "exit"]:
break
# Add user message to history
messages.append({"role": "user", "content": user_input})
# Stream the response
print("Assistant: ", end="", flush=True)
full_response = ""
with client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)
print() # Newline after response
# Add full response to history for next turn
messages.append({"role": "assistant", "content": full_response})
The model streams tokens, and you accumulate them into full_response. After streaming completes, you append the full message to history for the next turn. This combines the responsiveness of streaming with the context-awareness of multi-turn conversations.
Processing Streams Asynchronously
For high-concurrency applications (e.g., serving multiple users), blocking on stream.iter() is inefficient. Use async/await to handle multiple streams concurrently:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI() # Async client
async def stream_response(user_message):
"""Stream a single response asynchronously."""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_message}
]
print(f"User: {user_message}")
print("Assistant: ", end="", flush=True)
full_response = ""
async with await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
) as stream:
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)
print() # Newline
return full_response
async def main():
"""Simulate multiple concurrent streaming requests."""
tasks = [
stream_response("What is async/await?"),
stream_response("Explain generators in Python."),
stream_response("What is a decorator?")
]
# Run all three streams concurrently
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"\nResponse {i + 1} length: {len(response)} characters")
# Run the async main
asyncio.run(main())
The AsyncOpenAI client allows you to stream multiple responses in parallel. Each async for loop processes a stream without blocking others. This is crucial for web servers handling many concurrent user requests.
Handling Streaming Errors and Edge Cases
Streams can fail mid-transmission (network outage, rate limit, server error). Always wrap streaming code in error handling:
from openai import OpenAI, APIError, RateLimitError
import time
client = OpenAI()
def stream_with_retry(messages, max_retries=3):
"""Stream a response with retry logic."""
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)
print() # Newline
return full_response
except RateLimitError as e:
wait_time = 2 ** attempt # Exponential backoff
print(f"\nRate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
except APIError as e:
print(f"\nAPI error: {e}")
if attempt == max_retries - 1:
raise
time.sleep(1)
raise RuntimeError("Failed after max retries")
# Test
messages = [
{"role": "user", "content": "Explain Python decorators."}
]
response = stream_with_retry(messages)
Catching RateLimitError allows you to implement exponential backoff (wait 1s, 2s, 4s). Other APIError exceptions should trigger a final raise after max retries.
Streaming vs. Non-Streaming: Tradeoffs
Streaming improves user experience (perceived faster response) but has tradeoffs:
| Aspect | Streaming | Non-Streaming |
|---|---|---|
| Latency perceived | Low (tokens appear immediately) | High (wait for complete response) |
| Time to first token (TTFT) | Low (50–100 ms) | N/A (all at once) |
| Total request time | Same | Same |
| Error handling | Mid-stream error requires fallback | Fail or retry cleanly |
| Code complexity | Higher (iterate chunks, accumulate) | Simpler (one .content field) |
| Token counting pre-request | Not possible (don't know response length) | Possible (check .usage) |
Use streaming for interactive applications where perceived latency matters (chatbots, live code generation). Use non-streaming for batch processing or when you need final token counts before proceeding.
Key Takeaways
- Enable streaming by setting
stream=Truein the API request. - Iterate through chunks using
for chunk in streamand accesschunk.choices[0].delta.content. - Always accumulate tokens into a full response string if you need the complete message later.
- Use
AsyncOpenAIandasync forto stream multiple responses concurrently. - Handle rate limits and API errors with retry logic and exponential backoff.
- Streaming reduces perceived latency but slightly increases code complexity.
Frequently Asked Questions
Can I get token counts from a streaming response?
No; token counts are only available after the response completes. Non-streaming requests return response.usage.prompt_tokens and response.usage.completion_tokens. For streaming, the API does not expose token counts in the stream. Estimate them using tiktoken (described in Article 3) if you need pre-response budgeting.
What happens if the network cuts off mid-stream?
The iteration stops, and the accumulated tokens up to that point are preserved in your variable. You can catch the exception, log the partial response, and decide whether to retry, discard, or use the partial text. Streaming does not automatically recover; implement retry logic if needed.
Can I stream to a file?
Yes. Instead of printing tokens, write them to a file handle:
with open("response.txt", "w") as f:
for chunk in stream:
if chunk.choices[0].delta.content is not None:
f.write(chunk.choices[0].delta.content)
This builds a file token by token, useful for capturing large responses without buffering them in memory.
Is streaming always faster than non-streaming?
Total request time is the same, but perceived latency is lower because tokens appear immediately. In a terminal or web UI, streaming provides a better user experience even though the server takes the same total time to generate the response.