Back to all articles

Implementing Observability in FastAPI Applications

4 min read

Implementing Observability in FastAPI Applications

Modern applications require robust observability to understand their behavior, performance, and health. Let's explore how to implement comprehensive observability in FastAPI applications.

The Three Pillars of Observability

  1. Metrics: Quantitative measurements over time
  2. Traces: Request flow through your system
  3. Logs: Detailed event records

Setting Up OpenTelemetry

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def setup_telemetry():
    # Set up the tracer
    tracer_provider = TracerProvider()
    trace.set_tracer_provider(tracer_provider)
    
    # Configure the OTLP exporter
    otlp_exporter = OTLPSpanExporter(
        endpoint="http://jaeger:4317",
        insecure=True
    )
    
    # Add span processor
    span_processor = BatchSpanProcessor(otlp_exporter)
    tracer_provider.add_span_processor(span_processor)
    
    return tracer_provider

# Initialize FastAPI with OpenTelemetry
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)

Prometheus Metrics Integration

from prometheus_client import Counter, Histogram
from prometheus_fastapi_instrumentator import Instrumentator

# Define metrics
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"]
)

REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "HTTP request latency",
    ["method", "endpoint"]
)

# Set up Prometheus instrumentation
def setup_prometheus():
    Instrumentator().instrument(app).expose(app)

@app.middleware("http")
async def track_requests(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    duration = time.time() - start_time
    
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    
    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)
    
    return response

Structured Logging

import structlog
from typing import Optional

# Configure structlog
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger()

class LoggingMiddleware:
    async def __call__(
        self,
        request: Request,
        call_next: RequestResponseEndpoint
    ) -> Response:
        start_time = time.time()
        
        # Extract request details
        request_id = request.headers.get("X-Request-ID")
        if not request_id:
            request_id = str(uuid.uuid4())
        
        # Bind context
        log = logger.bind(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host
        )
        
        try:
            response = await call_next(request)
            duration = time.time() - start_time
            
            log.info(
                "request_processed",
                status_code=response.status_code,
                duration=duration
            )
            
            return response
        except Exception as e:
            log.error(
                "request_failed",
                error=str(e),
                exc_info=True
            )
            raise

Custom Business Metrics

from prometheus_client import Counter, Gauge

# Business metrics
ACTIVE_USERS = Gauge(
    "active_users_total",
    "Number of active users"
)

ORDER_VALUE = Counter(
    "order_value_total",
    "Total value of orders",
    ["product_category"]
)

class MetricsService:
    def track_user_activity(self, user_id: str):
        ACTIVE_USERS.inc()
        
    def track_order(self, category: str, value: float):
        ORDER_VALUE.labels(
            product_category=category
        ).inc(value)

Health Checks

from typing import Dict, Any

class HealthCheck:
    async def check_database(self) -> Dict[str, Any]:
        try:
            await db.execute("SELECT 1")
            return {
                "database": {
                    "status": "healthy",
                    "latency_ms": await self.measure_db_latency()
                }
            }
        except Exception as e:
            return {
                "database": {
                    "status": "unhealthy",
                    "error": str(e)
                }
            }
    
    async def check_redis(self) -> Dict[str, Any]:
        try:
            await redis.ping()
            return {
                "redis": {
                    "status": "healthy",
                    "connected_clients": await redis.info("clients")
                }
            }
        except Exception as e:
            return {
                "redis": {
                    "status": "unhealthy",
                    "error": str(e)
                }
            }

@app.get("/health")
async def health_check():
    checker = HealthCheck()
    return {
        "status": "healthy",
        "checks": {
            **(await checker.check_database()),
            **(await checker.check_redis())
        },
        "version": "1.0.0"
    }

Tracing Database Queries

from opentelemetry import trace
from sqlalchemy import event
from sqlalchemy.engine import Engine

tracer = trace.get_tracer(__name__)

@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(
    conn, 
    cursor, 
    statement, 
    parameters, 
    context, 
    executemany
):
    with tracer.start_as_current_span(
        "database_query",
        attributes={
            "db.statement": statement,
            "db.parameters": str(parameters)
        }
    ) as span:
        return

Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "FastAPI Application Metrics",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total[5m])",
            "legendFormat": "{{method}} {{endpoint}}"
          }
        ]
      },
      {
        "title": "Response Latency",
        "type": "heatmap",
        "targets": [
          {
            "expr": "rate(http_request_duration_seconds_bucket[5m])",
            "format": "heatmap"
          }
        ]
      },
      {
        "title": "Active Users",
        "type": "stat",
        "targets": [
          {
            "expr": "active_users_total"
          }
        ]
      }
    ]
  }
}

Docker Compose Setup

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
    depends_on:
      - jaeger
      - prometheus
  
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4317:4317"
  
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
  
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  grafana_data:

Best Practices

  1. Consistent Naming: Use consistent naming conventions for metrics, traces, and logs
  2. Cardinality Control: Be careful with high-cardinality labels in metrics
  3. Sampling: Implement appropriate sampling for traces in high-traffic systems
  4. Context Propagation: Ensure proper context propagation across service boundaries
  5. Alert Configuration: Set up meaningful alerts based on SLOs
  6. Documentation: Document the meaning and purpose of each metric and log field

Conclusion

A well-implemented observability stack is crucial for maintaining and troubleshooting modern applications. Key takeaways:

  • Use OpenTelemetry for distributed tracing
  • Implement Prometheus metrics for monitoring
  • Set up structured logging
  • Configure comprehensive health checks
  • Use Grafana for visualization
  • Follow best practices for maintainability

Remember that observability is not just about collecting data—it's about making that data actionable and useful for understanding your system's behavior.