Skip to main content

Cost Control and Rate Limiting

LLM API costs can spiral if unchecked. A single poorly designed application generating thousands of requests daily can incur hundreds of dollars in charges. Cost control requires three strategies: token budgeting (estimating and capping input/output tokens), rate limiting (constraining request frequency), and caching (reusing responses for identical queries). Understanding token costs and implementing these guardrails is essential before deploying to production.

Understanding Token Costs and Pricing

Every API request is charged per token. As of June 2026, OpenAI pricing for GPT-4o-mini is approximately $0.075 per 1 million input tokens and $0.30 per 1 million output tokens. A 1,000-token input and 500-token output costs roughly $(0.075 + 0.15) / 1000 = $0.000225. For high-volume applications, these fractions compound. Always estimate token costs before deploying:

import tiktoken
from openai import OpenAI

client = OpenAI()
encoding = tiktoken.encoding_for_model("gpt-4o-mini")

# Input costs (in millions)
INPUT_TOKENS_PER_MILLION = 0.000075
OUTPUT_TOKENS_PER_MILLION = 0.00030

# Sample request
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain machine learning in 100 words."}
]

# Estimate input tokens
input_tokens = sum(len(encoding.encode(m["content"])) for m in messages)
estimated_output_tokens = 120 # Typical for this prompt

input_cost = (input_tokens / 1_000_000) * (1 / INPUT_TOKENS_PER_MILLION)
output_cost = (estimated_output_tokens / 1_000_000) * (1 / OUTPUT_TOKENS_PER_MILLION)

print(f"Estimated cost: ${input_cost + output_cost:.6f}")
print(f"For 1000 requests: ${(input_cost + output_cost) * 1000:.2f}")

If your application makes 10,000 requests per day and each costs $0.0003, annual cost is roughly $1,095. Even small per-request savings (optimizing prompts) compound to thousands of dollars yearly.

Token Budgeting: Setting Hard Limits

Before processing user requests, check if you have budget remaining. Implement a token counter that tracks cumulative usage and rejects requests that would exceed limits:

from openai import OpenAI
import tiktoken

client = OpenAI()
encoding = tiktoken.encoding_for_model("gpt-4o-mini")

class TokenBudget:
def __init__(self, max_tokens_per_day=100_000):
self.max_tokens = max_tokens_per_day
self.used_tokens = 0

def estimate_request(self, messages, max_output_tokens=1000):
"""Estimate tokens for a request without sending it."""
input_tokens = sum(len(encoding.encode(m["content"])) for m in messages)
total_estimate = input_tokens + max_output_tokens
return total_estimate

def can_afford(self, messages, max_output_tokens=1000):
"""Check if request fits within remaining budget."""
estimate = self.estimate_request(messages, max_output_tokens)
return self.used_tokens + estimate <= self.max_tokens

def record_usage(self, actual_prompt_tokens, actual_completion_tokens):
"""Record actual token usage after API call."""
self.used_tokens += actual_prompt_tokens + actual_completion_tokens
remaining = self.max_tokens - self.used_tokens
print(f"Used {actual_prompt_tokens + actual_completion_tokens} tokens. "
f"Remaining: {remaining} / {self.max_tokens}")

def budget_remaining(self):
return self.max_tokens - self.used_tokens

# Usage
budget = TokenBudget(max_tokens_per_day=10_000)

messages = [
{"role": "user", "content": "Explain Python decorators."}
]

if budget.can_afford(messages):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
max_tokens=500
)

budget.record_usage(
response.usage.prompt_tokens,
response.usage.completion_tokens
)

print(response.choices[0].message.content)
else:
print(f"Budget exceeded. Remaining: {budget.budget_remaining()} tokens")

Tracking usage prevents surprise bills. If you hit limits, you can pause processing, alert users, or degrade service gracefully.

Rate Limiting: Throttling Request Frequency

Rate limiting prevents overload by capping the number of requests per time interval. OpenAI enforces server-side rate limits; client-side rate limiting prevents hitting them:

import time
from openai import OpenAI, RateLimitError

client = OpenAI()

class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute # Seconds between requests
self.last_request_time = 0

def wait_if_needed(self):
"""Block until enough time has passed since the last request."""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
wait_time = self.min_interval - elapsed
time.sleep(wait_time)
self.last_request_time = time.time()

limiter = RateLimiter(requests_per_minute=60)

messages = [
{"role": "user", "content": "What is Python?"}
]

for i in range(5):
limiter.wait_if_needed()

try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages
)
print(f"Request {i + 1} succeeded")
except RateLimitError:
print(f"Rate limited on request {i + 1}")
time.sleep(30)

This ensures requests are spaced at least min_interval seconds apart. For 60 RPM, that is 1 request per second.

Exponential Backoff: Handling Rate Limit Errors

When the API returns a rate limit error, exponential backoff automatically retries after increasingly long waits. This is more robust than client-side throttling because it responds to actual server load:

import time
from openai import OpenAI, RateLimitError, APIError

client = OpenAI()

def api_call_with_backoff(messages, max_retries=5):
"""Make an API call with exponential backoff retry logic."""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
timeout=10 # 10-second timeout
)
return response

except RateLimitError as e:
wait_time = 2 ** attempt # 1, 2, 4, 8, 16 seconds
print(f"Rate limited. Retrying in {wait_time} seconds...")
time.sleep(wait_time)

except APIError as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"API error: {e}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)

raise RuntimeError(f"Failed after {max_retries} retries")

# Usage
messages = [{"role": "user", "content": "Hello!"}]
response = api_call_with_backoff(messages)
print(response.choices[0].message.content)

Exponential backoff is the OpenAI recommendation. It avoids hammering a rate-limited API and gives the server time to recover.

Caching: Reduce Redundant Requests

Identical requests generate identical responses. Caching avoids re-processing by storing results. Python's functools.lru_cache is ideal for development; production applications use Redis or similar:

from openai import OpenAI
from functools import lru_cache
import json

client = OpenAI()

@lru_cache(maxsize=100)
def cached_completion(prompt, model="gpt-4o-mini"):
"""Cache completions by prompt. Requires prompt to be hashable (string)."""
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content

# First call: goes to API
result1 = cached_completion("What is Python?")
print(f"Result 1: {result1[:50]}...")

# Second call: returns from cache (no API call)
result2 = cached_completion("What is Python?")
print(f"Result 2 (cached): {result2[:50]}...")

# Different prompt: goes to API again
result3 = cached_completion("What is Rust?")
print(f"Result 3: {result3[:50]}...")

# Check cache stats
print(f"Cache info: {cached_completion.cache_info()}")

The @lru_cache decorator stores the last 100 unique prompts. Cache hits return immediately; misses query the API. For production, use Redis:

import redis
import hashlib

redis_client = redis.Redis(host="localhost", port=6379, db=0)

def cached_completion_redis(prompt, model="gpt-4o-mini"):
"""Cache using Redis."""
cache_key = hashlib.md5(prompt.encode()).hexdigest()

# Check cache
cached = redis_client.get(cache_key)
if cached:
return cached.decode()

# Query API
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model=model,
messages=messages
)

result = response.choices[0].message.content

# Store in cache (expire after 1 day)
redis_client.setex(cache_key, 86400, result)

return result

Redis caching persists across processes and servers, making it ideal for distributed applications.

Key Takeaways

  • Token costs accumulate quickly; estimate per-request cost and project annual spending before deployment.
  • Use tiktoken to estimate token counts and response.usage to track actual consumption.
  • Implement token budgets with hard limits to prevent runaway costs.
  • Use client-side rate limiting to pace requests and avoid hitting server limits.
  • Implement exponential backoff for handling RateLimitError gracefully.
  • Cache identical requests using lru_cache (development) or Redis (production).

Frequently Asked Questions

How often should I check the token budget?

Check before every API call that would risk exceeding limits. For high-volume applications, check every request. For low-volume ones, checking once per day is sufficient.

Can I exceed my budget gracefully?

Yes. When a request would exceed budget, either queue it for later (when budget resets), downgrade the request (use a smaller model), or return a cached result. Always notify the user if quality is degraded.

What is the difference between rate limiting and backoff?

Rate limiting proactively spaces requests to avoid triggering rate limits. Backoff reactively responds when the server rejects a request. Use both: rate limit client-side to avoid errors, and implement backoff for when they still occur.

Is Redis overkill for a small application?

Yes. For one-off scripts or prototypes, lru_cache is sufficient. For production web servers handling multiple concurrent users, Redis provides persistence and shared state across processes.

How long should I cache results?

For factual queries (e.g., "What is Python?"), cache indefinitely. For time-sensitive queries (e.g., "What are today's stock prices?"), cache for minutes. For user-specific data, do not cache (privacy/correctness).

Further Reading