Skip to main content

LLM Streaming Responses in Python

Streaming LLM responses transforms the user experience from "wait for a complete response" to "watch tokens appear in real time." Instead of waiting 3–5 seconds for a full response to generate and return, streaming delivers tokens as they are produced, allowing you to display text progressively. This makes AI-powered applications feel more responsive and interactive, mimicking real human conversation. Streaming also enables cancellation mid-response and allows frontend code to process tokens before the entire response completes.

How Streaming Works

Streaming uses server-sent events (SSE) to return an open HTTP connection over which the API sends tokens one at a time. The openai Python client handles the protocol; you iterate over the stream to process each token:

from openai import OpenAI

client = OpenAI()

# Create a streaming completion
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "user", "content": "Write a short poem about Python programming."}
],
stream=True # Enable streaming
)

# Process tokens as they arrive
for chunk in stream:
# Each chunk is a completion delta
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)

print() # Newline at the end

The key is setting stream=True. The API returns a generator that yields chunk objects. Each chunk contains a delta (the incremental content added since the last chunk). You iterate through chunks and print or process tokens as they arrive. The flush=True argument ensures tokens appear immediately on the screen without buffering.

Building a Streaming Chat Chatbot

Streaming shines in interactive chatbots. You collect user input, stream the response, and accumulate tokens into a complete message for history:

from openai import OpenAI

client = OpenAI()

messages = [
{
"role": "system",
"content": "You are a friendly Python tutor. Explain concepts clearly and concisely."
}
]

while True:
user_input = input("You: ")
if user_input.lower() in ["quit", "exit"]:
break

# Add user message to history
messages.append({"role": "user", "content": user_input})

# Stream the response
print("Assistant: ", end="", flush=True)

full_response = ""
with client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
) as stream:
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)

print() # Newline after response

# Add full response to history for next turn
messages.append({"role": "assistant", "content": full_response})

The model streams tokens, and you accumulate them into full_response. After streaming completes, you append the full message to history for the next turn. This combines the responsiveness of streaming with the context-awareness of multi-turn conversations.

Processing Streams Asynchronously

For high-concurrency applications (e.g., serving multiple users), blocking on stream.iter() is inefficient. Use async/await to handle multiple streams concurrently:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI() # Async client

async def stream_response(user_message):
"""Stream a single response asynchronously."""
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_message}
]

print(f"User: {user_message}")
print("Assistant: ", end="", flush=True)

full_response = ""
async with await client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
) as stream:
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)

print() # Newline
return full_response

async def main():
"""Simulate multiple concurrent streaming requests."""
tasks = [
stream_response("What is async/await?"),
stream_response("Explain generators in Python."),
stream_response("What is a decorator?")
]

# Run all three streams concurrently
responses = await asyncio.gather(*tasks)

for i, response in enumerate(responses):
print(f"\nResponse {i + 1} length: {len(response)} characters")

# Run the async main
asyncio.run(main())

The AsyncOpenAI client allows you to stream multiple responses in parallel. Each async for loop processes a stream without blocking others. This is crucial for web servers handling many concurrent user requests.

Handling Streaming Errors and Edge Cases

Streams can fail mid-transmission (network outage, rate limit, server error). Always wrap streaming code in error handling:

from openai import OpenAI, APIError, RateLimitError
import time

client = OpenAI()

def stream_with_retry(messages, max_retries=3):
"""Stream a response with retry logic."""
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
stream=True
)

full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True)

print() # Newline
return full_response

except RateLimitError as e:
wait_time = 2 ** attempt # Exponential backoff
print(f"\nRate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)

except APIError as e:
print(f"\nAPI error: {e}")
if attempt == max_retries - 1:
raise
time.sleep(1)

raise RuntimeError("Failed after max retries")

# Test
messages = [
{"role": "user", "content": "Explain Python decorators."}
]

response = stream_with_retry(messages)

Catching RateLimitError allows you to implement exponential backoff (wait 1s, 2s, 4s). Other APIError exceptions should trigger a final raise after max retries.

Streaming vs. Non-Streaming: Tradeoffs

Streaming improves user experience (perceived faster response) but has tradeoffs:

AspectStreamingNon-Streaming
Latency perceivedLow (tokens appear immediately)High (wait for complete response)
Time to first token (TTFT)Low (50–100 ms)N/A (all at once)
Total request timeSameSame
Error handlingMid-stream error requires fallbackFail or retry cleanly
Code complexityHigher (iterate chunks, accumulate)Simpler (one .content field)
Token counting pre-requestNot possible (don't know response length)Possible (check .usage)

Use streaming for interactive applications where perceived latency matters (chatbots, live code generation). Use non-streaming for batch processing or when you need final token counts before proceeding.

Key Takeaways

  • Enable streaming by setting stream=True in the API request.
  • Iterate through chunks using for chunk in stream and access chunk.choices[0].delta.content.
  • Always accumulate tokens into a full response string if you need the complete message later.
  • Use AsyncOpenAI and async for to stream multiple responses concurrently.
  • Handle rate limits and API errors with retry logic and exponential backoff.
  • Streaming reduces perceived latency but slightly increases code complexity.

Frequently Asked Questions

Can I get token counts from a streaming response?

No; token counts are only available after the response completes. Non-streaming requests return response.usage.prompt_tokens and response.usage.completion_tokens. For streaming, the API does not expose token counts in the stream. Estimate them using tiktoken (described in Article 3) if you need pre-response budgeting.

What happens if the network cuts off mid-stream?

The iteration stops, and the accumulated tokens up to that point are preserved in your variable. You can catch the exception, log the partial response, and decide whether to retry, discard, or use the partial text. Streaming does not automatically recover; implement retry logic if needed.

Can I stream to a file?

Yes. Instead of printing tokens, write them to a file handle:

with open("response.txt", "w") as f:
for chunk in stream:
if chunk.choices[0].delta.content is not None:
f.write(chunk.choices[0].delta.content)

This builds a file token by token, useful for capturing large responses without buffering them in memory.

Is streaming always faster than non-streaming?

Total request time is the same, but perceived latency is lower because tokens appear immediately. In a terminal or web UI, streaming provides a better user experience even though the server takes the same total time to generate the response.

Further Reading