Skip to main content

Building Observable Python Applications: Complete Example

This article ties together all the concepts from the series: logging, structured logs, metrics, distributed tracing, and error reporting. You will build a complete, production-ready Flask application that demonstrates observability best practices. The application is a simple order processing service that accepts orders, charges credit cards, and ships items. By the end, you will understand how logs, metrics, and traces flow through a real application and how each pillar contributes to diagnosing failures and understanding performance.

The complete example is deliberately small (under 300 lines) so that every line is meaningful. In a real application, you would add authentication, validation, persistence, and integrations. The observability infrastructure—logging, metrics, tracing, error reporting—remains the same at scale.

Complete Observable Flask Application

Here is a production-ready Flask application with all observability components:

import os
import logging
import json
import time
import uuid
from datetime import datetime
from functools import wraps

from flask import Flask, request, jsonify, g
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.propagate import extract, inject
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# ============================================================================
# Setup: Logging
# ============================================================================
class JSONFormatter(logging.Formatter):
"""Format logs as JSON for structured logging."""
def format(self, record):
log_obj = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'correlation_id': getattr(g, 'correlation_id', 'unknown')
}
# Add extra fields
if hasattr(record, 'user_id'):
log_obj['user_id'] = record.user_id
if hasattr(record, 'order_id'):
log_obj['order_id'] = record.order_id
return json.dumps(log_obj)

def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# File handler: all logs
file_handler = logging.FileHandler('app.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(JSONFormatter())
logger.addHandler(file_handler)

# Console handler: INFO and above
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(JSONFormatter())
logger.addHandler(console_handler)

return logging.getLogger(__name__)

logger = setup_logging()

# ============================================================================
# Setup: Observability (Sentry, Prometheus, OpenTelemetry)
# ============================================================================

# Initialize Sentry for error tracking
sentry_sdk.init(
dsn=os.environ.get('SENTRY_DSN', ''),
integrations=[FlaskIntegration()],
traces_sample_rate=0.1,
environment=os.environ.get('ENVIRONMENT', 'development'),
release='1.0.0'
)

# Initialize OpenTelemetry for distributed tracing
resource = Resource.create({SERVICE_NAME: "order-service"})
jaeger_exporter = JaegerExporter(
agent_host_name=os.environ.get('JAEGER_HOST', 'localhost'),
agent_port=int(os.environ.get('JAEGER_PORT', 6831))
)
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(trace_provider)

# Auto-instrument Flask and requests
FlaskInstrumentor().instrument_app(Flask(__name__))
RequestsInstrumentor().instrument()

tracer = trace.get_tracer(__name__)

# Prometheus metrics
http_requests = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
http_duration = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['endpoint']
)
order_processing = Counter(
'orders_processed_total',
'Total orders processed',
['status']
)
card_charges = Histogram(
'card_charge_duration_seconds',
'Time to charge credit card',
['status']
)

# ============================================================================
# Flask Application
# ============================================================================

app = Flask(__name__)

@app.before_request
def setup_request_context():
"""Called before every request. Set up correlation ID and logging context."""
# Generate or extract correlation ID
correlation_id = request.headers.get('X-Correlation-ID')
if not correlation_id:
correlation_id = str(uuid.uuid4())
g.correlation_id = correlation_id

# Extract distributed tracing context
ctx = extract(request.headers)
g.trace_context = ctx

# Set Sentry context
sentry_sdk.set_user({'id': request.headers.get('X-User-ID', 'anonymous')})
sentry_sdk.set_tag('endpoint', request.endpoint or 'unknown')
sentry_sdk.set_context('request', {
'method': request.method,
'path': request.path,
'remote_addr': request.remote_addr
})

@app.after_request
def add_correlation_header(response):
"""Add correlation ID to response."""
response.headers['X-Correlation-ID'] = g.correlation_id
return response

def get_logger():
"""Get a logger with correlation ID."""
class CorrelationFilter(logging.Filter):
def filter(self, record):
record.correlation_id = getattr(g, 'correlation_id', 'unknown')
return True

log = logging.getLogger(__name__)
for f in log.filters:
if isinstance(f, CorrelationFilter):
return log
log.addFilter(CorrelationFilter())
return log

@app.route('/orders', methods=['POST'])
def create_order():
"""Create and process an order."""
log = get_logger()

with tracer.start_as_current_span("create_order", context=g.trace_context) as span:
order_data = request.json
order_id = order_data.get('id', str(uuid.uuid4()))
user_id = order_data.get('user_id')
amount = order_data.get('amount', 0)

span.set_attribute('order_id', order_id)
span.set_attribute('user_id', user_id)
span.set_attribute('amount', amount)

log.info(
"Order creation started",
extra={'order_id': order_id, 'user_id': user_id, 'amount': amount}
)

try:
# Step 1: Validate order
with tracer.start_as_current_span("validate_order") as val_span:
log.debug(
"Validating order",
extra={'order_id': order_id}
)
if amount <= 0:
raise ValueError(f"Invalid amount: {amount}")
val_span.set_attribute('valid', True)

# Step 2: Charge credit card
with tracer.start_as_current_span("charge_card") as charge_span:
start = time.time()
log.info(
"Charging credit card",
extra={'order_id': order_id, 'amount': amount}
)

# Simulate card charge (9 out of 10 times succeeds)
import random
if random.random() > 0.1:
charge_id = f"ch_{uuid.uuid4().hex[:12]}"
duration = time.time() - start
card_charges.labels(status='success').observe(duration)
log.info(
"Card charge succeeded",
extra={'order_id': order_id, 'charge_id': charge_id}
)
charge_span.set_attribute('charge_id', charge_id)
else:
raise Exception("Card processor timeout (simulated)")

# Step 3: Save order to database
with tracer.start_as_current_span("save_order") as save_span:
log.info(
"Saving order to database",
extra={'order_id': order_id}
)
# Simulate database save
time.sleep(0.05)
save_span.set_attribute('saved', True)

# Step 4: Ship order
with tracer.start_as_current_span("ship_order") as ship_span:
log.info(
"Scheduling shipment",
extra={'order_id': order_id}
)
tracking_id = f"tk_{uuid.uuid4().hex[:12]}"
ship_span.set_attribute('tracking_id', tracking_id)

# Success
order_processing.labels(status='success').inc()
log.info(
"Order processing completed successfully",
extra={'order_id': order_id, 'tracking_id': tracking_id}
)

response_data = {
'order_id': order_id,
'tracking_id': tracking_id,
'status': 'success',
'correlation_id': g.correlation_id
}
http_requests.labels(
method=request.method,
endpoint='/orders',
status=201
).inc()
return jsonify(response_data), 201

except ValueError as e:
log.warning(
"Order validation failed",
extra={'order_id': order_id, 'error': str(e)}
)
order_processing.labels(status='validation_error').inc()
sentry_sdk.capture_exception(e)
http_requests.labels(
method=request.method,
endpoint='/orders',
status=400
).inc()
return jsonify({'error': str(e), 'correlation_id': g.correlation_id}), 400

except Exception as e:
log.error(
"Order processing failed",
extra={'order_id': order_id, 'error': str(e)}
)
order_processing.labels(status='error').inc()
span.record_exception(e)
sentry_sdk.capture_exception(e)
http_requests.labels(
method=request.method,
endpoint='/orders',
status=500
).inc()
return jsonify({
'error': 'Internal server error',
'correlation_id': g.correlation_id
}), 500

@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint."""
return jsonify({
'status': 'healthy',
'service': 'order-service',
'version': '1.0.0'
}), 200

@app.route('/metrics', methods=['GET'])
def metrics():
"""Prometheus metrics endpoint."""
return generate_latest(), 200, {'Content-Type': CONTENT_TYPE_LATEST}

@app.errorhandler(Exception)
def handle_error(error):
"""Global error handler."""
log = get_logger()
log.error(f"Unhandled exception: {error}")
sentry_sdk.capture_exception(error)
http_requests.labels(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=500
).inc()
return jsonify({
'error': 'Internal server error',
'correlation_id': g.correlation_id
}), 500

# ============================================================================
# Main
# ============================================================================

if __name__ == '__main__':
logger.info("Order service starting up")
app.run(debug=False, port=5000)

How to Run This Example

  1. Install dependencies:
pip install flask prometheus-client sentry-sdk opentelemetry-api opentelemetry-sdk opentelemetry-exporter-jaeger opentelemetry-instrumentation-flask opentelemetry-instrumentation-requests
  1. Start the application:
ENVIRONMENT=development python app.py
  1. Create an order:
curl -X POST http://localhost:5000/orders \
-H "Content-Type: application/json" \
-H "X-User-ID: user123" \
-d '{"id": "order1", "user_id": "user123", "amount": 99.99}'
  1. View metrics:
curl http://localhost:5000/metrics
  1. Check logs:
tail -f app.log | jq '.'  # Pretty-print JSON logs
  1. View trace in Jaeger (if running):
# Start Jaeger locally (Docker)
docker run -p 16686:16686 -p 6831:6831/udp jaegertracing/all-in-one

# Visit http://localhost:16686 to see traces

Key Observability Patterns in This Example

  1. Correlation ID: Every log and trace is tagged with the same ID, linking them together.
  2. Structured logging: Logs are JSON with named fields (order_id, user_id, amount).
  3. Metrics: Counters track order status; histograms track latency.
  4. Distributed tracing: Spans capture operation hierarchy and timing.
  5. Error reporting: Exceptions are sent to Sentry with context.
  6. Context propagation: Trace context flows through the request.

Key Takeaways

  • Observable applications emit logs, metrics, and traces.
  • Correlation IDs link logs and traces to the same request.
  • Structured logging enables search and analysis.
  • Metrics aggregate behavior across thousands of requests.
  • Traces show the causal path through your system.
  • Error reporting surfaces issues in real time.

Frequently Asked Questions

Can I run the example without Jaeger or Sentry?

Yes. The application gracefully handles missing backends. Set SENTRY_DSN= to skip Sentry; Jaeger defaults to localhost. The application logs and metrics work independently.

How do I add authentication and validation?

Add Flask-JWT or similar for authentication. Add request validation using libraries like marshmallow. The observability patterns remain the same.

How do I store orders in a database instead of simulating?

Use SQLAlchemy with OpenTelemetry instrumentation. The observability calls (logging, tracing) stay the same; only the database layer changes.

What should I do when an exception occurs in production?

Log it, trace it, and let Sentry capture it. The user receives the correlation ID so they can report the issue. You investigate using logs, traces, and Sentry's error aggregation.

How do I know if my observability is working?

Test end-to-end: make a request, check that the correlation ID appears in logs and Jaeger, and that Prometheus metrics are recorded. Simulate an error and verify Sentry receives it.

Further Reading