Idempotent API Requests: Safe Retries in Python
An idempotent operation produces the same result no matter how many times you execute it. GET requests are inherently idempotent (fetching data twice returns the same data). POST requests that create resources or trigger side effects are not idempotent by default: POST to charge a credit card twice creates two charges. The solution is idempotency keys: the client sends a unique ID with each request, and the server uses that ID to detect duplicate requests and return the previous result instead of executing again. This article shows how to design idempotent APIs and implement idempotent clients that make safe retries.
I worked on a payment processing system where flaky networks would sometimes make clients retry a transaction, creating duplicate charges. Customers were furious. After implementing idempotency keys, retries became safe: if the network failed after the charge succeeded but before the response arrived, the retry would return the original success message, not create a new charge. The fix took 20 lines of code and saved thousands in disputes.
Idempotency in HTTP
The HTTP specification defines safe and idempotent methods:
- Safe: Has no side effects. GET, HEAD, OPTIONS are safe. GET request 100 times = same result, no changes.
- Idempotent: Same result regardless of repetitions. GET, HEAD, PUT, DELETE are idempotent. POST is not (each POST creates a new resource or triggers a new action).
But in practice, you often have POST requests that should be idempotent (charging a card, creating a payment record). That's where idempotency keys come in.
Idempotency Key Pattern
The pattern is simple:
- Client: Generates a unique ID (UUID) for each request. Sends it in a header:
Idempotency-Key: <uuid>. - Server: On first request, processes normally and stores the result, keyed by the idempotency key.
- Server: On retry with the same key, returns the stored result without re-processing.
# Client: Python requests library
import uuid
import requests
def charge_card_idempotent(amount: float) -> dict:
"""Charge a card safely with idempotency."""
# Generate a unique key for this operation
idempotency_key = str(uuid.uuid4())
response = requests.post(
'https://payment-api.example.com/charges',
json={'amount': amount},
headers={'Idempotency-Key': idempotency_key},
timeout=10
)
# Retry logic: if network fails, retry with the SAME idempotency key
if response.status_code == 500:
# Network error or service error; retry
response = requests.post(
'https://payment-api.example.com/charges',
json={'amount': amount},
headers={'Idempotency-Key': idempotency_key}, # Same key!
timeout=10
)
return response.json()
If the first request succeeded but the response was lost in transit, the retry returns the same result because the server recognizes the idempotency key.
Server-Side Implementation
On the server, store results keyed by idempotency key:
from fastapi import FastAPI, Header, HTTPException
from sqlalchemy import Column, String, Integer, DateTime
from sqlalchemy.orm import Session
import json
from datetime import datetime
app = FastAPI()
# Database model to store idempotency results
class IdempotencyResult(Base):
__tablename__ = 'idempotency_results'
idempotency_key = Column(String(36), primary_key=True)
method = Column(String(10))
path = Column(String(255))
status_code = Column(Integer)
response_body = Column(String(65536)) # JSON
created_at = Column(DateTime, default=datetime.utcnow)
@app.post('/charges')
async def create_charge(
request_data: dict,
idempotency_key: str = Header(None),
db: Session = Depends(get_db)
):
"""Create a charge, idempotently."""
# Validate idempotency key
if not idempotency_key:
raise HTTPException(status_code=400, detail="Idempotency-Key header required")
# Check if we've seen this key before
existing = db.query(IdempotencyResult).filter(
IdempotencyResult.idempotency_key == idempotency_key
).first()
if existing:
# Return previous result
return json.loads(existing.response_body), existing.status_code
try:
# First request: process the charge
charge = charge_card(request_data['amount'])
response_body = {
'id': charge.id,
'amount': charge.amount,
'status': 'success'
}
status_code = 201
# Store result for future retries
result = IdempotencyResult(
idempotency_key=idempotency_key,
method='POST',
path='/charges',
status_code=status_code,
response_body=json.dumps(response_body)
)
db.add(result)
db.commit()
return response_body, status_code
except Exception as e:
# Also store failures, so retry doesn't retry
response_body = {'error': str(e)}
status_code = 500
result = IdempotencyResult(
idempotency_key=idempotency_key,
method='POST',
path='/charges',
status_code=status_code,
response_body=json.dumps(response_body)
)
db.add(result)
db.commit()
raise HTTPException(status_code=status_code, detail=str(e))
Handling Idempotency Conflicts
What if the client sends the same idempotency key with different request bodies? This is a conflict and should be rejected:
@app.post('/charges')
async def create_charge(
request_data: dict,
idempotency_key: str = Header(None),
db: Session = Depends(get_db)
):
"""Create a charge, idempotently."""
if not idempotency_key:
raise HTTPException(status_code=400, detail="Idempotency-Key header required")
existing = db.query(IdempotencyResult).filter(
IdempotencyResult.idempotency_key == idempotency_key
).first()
if existing:
# Verify the request body matches
original_request = json.loads(existing.request_body)
if original_request != request_data:
raise HTTPException(
status_code=422,
detail="Idempotency key conflict: different request body"
)
# Bodies match; return previous result
return json.loads(existing.response_body), existing.status_code
# ... process new request
Client Retry Helper with Idempotency
Here's a reusable client helper that automatically manages idempotency:
import uuid
import requests
from typing import Callable, Any
class IdempotentClient:
def __init__(self, base_url: str, timeout: int = 10):
self.base_url = base_url
self.timeout = timeout
def post_idempotent(
self,
endpoint: str,
data: dict,
max_retries: int = 3,
idempotency_key: str = None
) -> tuple[dict, int]:
"""POST with automatic idempotency."""
# Use provided key or generate new one
if not idempotency_key:
idempotency_key = str(uuid.uuid4())
url = f"{self.base_url}{endpoint}"
headers = {'Idempotency-Key': idempotency_key}
for attempt in range(max_retries):
try:
response = requests.post(
url,
json=data,
headers=headers,
timeout=self.timeout
)
# Success or client error
if response.status_code != 500:
return response.json(), response.status_code
# Server error; retry
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
except (requests.Timeout, requests.ConnectionError) as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
raise Exception(f"Failed after {max_retries} attempts")
# Usage
client = IdempotentClient('https://payment-api.example.com')
charge_data = {'amount': 99.99, 'currency': 'USD'}
try:
result, status = client.post_idempotent('/charges', charge_data)
print(f"Charge successful: {result}")
except Exception as e:
print(f"Charge failed: {e}")
Idempotency in Distributed Systems
In a microservices architecture, you must coordinate idempotency across services. Store idempotency keys in a shared cache (Redis) so all services see them:
import redis
class DistributedIdempotencyStore:
def __init__(self, redis_client):
self.redis = redis_client
def get_cached_result(self, idempotency_key: str) -> Any:
"""Get stored result, or None if not found."""
cached = self.redis.get(f'idempotency:{idempotency_key}')
return json.loads(cached) if cached else None
def store_result(self, idempotency_key: str, result: Any, ttl: int = 86400):
"""Store result for 24 hours."""
self.redis.setex(
f'idempotency:{idempotency_key}',
ttl,
json.dumps(result)
)
def process_idempotently(self, idempotency_key: str, process_fn: Callable, *args):
"""Execute process_fn once per idempotency key."""
# Check if we've processed this before
cached = self.get_cached_result(idempotency_key)
if cached:
return cached
# New request; process it
result = process_fn(*args)
# Store for next retry
self.store_result(idempotency_key, result)
return result
# Usage
idempotency = DistributedIdempotencyStore(redis_client)
@app.post('/transfer')
def transfer_money(from_account, to_account, amount, idempotency_key):
def do_transfer():
# Execute money transfer
from_account.balance -= amount
to_account.balance += amount
db.commit()
return {'status': 'success', 'from': from_account.id, 'to': to_account.id}
return idempotency.process_idempotently(idempotency_key, do_transfer)
Key Takeaways
- Idempotency keys allow safe retries of non-idempotent operations like charging credit cards.
- Store results keyed by idempotency key; replay the result on retry instead of re-processing.
- Reject requests with the same idempotency key but different bodies (conflict detection).
- Use UUID v4 for idempotency keys; they're unique and stateless.
- Store results for at least 24 hours so late retries don't fail.
Frequently Asked Questions
How long should I store idempotency results?
24 hours is standard. Most networks recover within seconds; 24 hours covers re-queued jobs and retries from long-running operations. For critical operations, consider longer (7 days).
Should I use UUID v4 or something else for idempotency keys?
UUID v4 is fine—it's random, collision-free, and widely understood. Some systems use timestamp-based UUIDs (v7) for better database ordering. The key point: make it unique per request.
What if two requests arrive with the same idempotency key simultaneously?
Your server must detect this and serialize them. Use a database unique constraint or Redis atomic operation to ensure only one request processes:
# Use Redis to create a lock per idempotency key
lock_key = f'lock:{idempotency_key}'
if not redis.setnx(lock_key, '1'):
# Another request is processing this key; wait or return 409 Conflict
time.sleep(0.5)
cached = get_cached_result(idempotency_key)
if cached:
return cached
Is idempotency just for POST requests?
Primarily POST. GET/DELETE/PUT are already idempotent. However, if your GET request has side effects (logging, triggering a job), treat it as non-idempotent and use idempotency keys.
How do I test idempotency?
Make two requests with the same idempotency key and different request bodies. Verify you get a 422 conflict error. Then make two requests with the same idempotency key and same body, verify both return the same result.
Further Reading
- Stripe: Idempotent Requests — Industry standard, clearly explained.
- Twilio: Idempotent API Design — Another major API's implementation.
- AWS Lambda: Idempotent Workflows — Pattern for serverless systems.
- RFC 9110: HTTP Semantics — Official definition of idempotent methods.