Implementing Observability in FastAPI Applications
4 min read
Implementing Observability in FastAPI Applications
Modern applications require robust observability to understand their behavior, performance, and health. Let's explore how to implement comprehensive observability in FastAPI applications.
The Three Pillars of Observability
- Metrics: Quantitative measurements over time
- Traces: Request flow through your system
- Logs: Detailed event records
Setting Up OpenTelemetry
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_telemetry():
# Set up the tracer
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Configure the OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="http://jaeger:4317",
insecure=True
)
# Add span processor
span_processor = BatchSpanProcessor(otlp_exporter)
tracer_provider.add_span_processor(span_processor)
return tracer_provider
# Initialize FastAPI with OpenTelemetry
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
Prometheus Metrics Integration
from prometheus_client import Counter, Histogram
from prometheus_fastapi_instrumentator import Instrumentator
# Define metrics
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"HTTP request latency",
["method", "endpoint"]
)
# Set up Prometheus instrumentation
def setup_prometheus():
Instrumentator().instrument(app).expose(app)
@app.middleware("http")
async def track_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
Structured Logging
import structlog
from typing import Optional
# Configure structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class LoggingMiddleware:
async def __call__(
self,
request: Request,
call_next: RequestResponseEndpoint
) -> Response:
start_time = time.time()
# Extract request details
request_id = request.headers.get("X-Request-ID")
if not request_id:
request_id = str(uuid.uuid4())
# Bind context
log = logger.bind(
request_id=request_id,
method=request.method,
path=request.url.path,
client_ip=request.client.host
)
try:
response = await call_next(request)
duration = time.time() - start_time
log.info(
"request_processed",
status_code=response.status_code,
duration=duration
)
return response
except Exception as e:
log.error(
"request_failed",
error=str(e),
exc_info=True
)
raise
Custom Business Metrics
from prometheus_client import Counter, Gauge
# Business metrics
ACTIVE_USERS = Gauge(
"active_users_total",
"Number of active users"
)
ORDER_VALUE = Counter(
"order_value_total",
"Total value of orders",
["product_category"]
)
class MetricsService:
def track_user_activity(self, user_id: str):
ACTIVE_USERS.inc()
def track_order(self, category: str, value: float):
ORDER_VALUE.labels(
product_category=category
).inc(value)
Health Checks
from typing import Dict, Any
class HealthCheck:
async def check_database(self) -> Dict[str, Any]:
try:
await db.execute("SELECT 1")
return {
"database": {
"status": "healthy",
"latency_ms": await self.measure_db_latency()
}
}
except Exception as e:
return {
"database": {
"status": "unhealthy",
"error": str(e)
}
}
async def check_redis(self) -> Dict[str, Any]:
try:
await redis.ping()
return {
"redis": {
"status": "healthy",
"connected_clients": await redis.info("clients")
}
}
except Exception as e:
return {
"redis": {
"status": "unhealthy",
"error": str(e)
}
}
@app.get("/health")
async def health_check():
checker = HealthCheck()
return {
"status": "healthy",
"checks": {
**(await checker.check_database()),
**(await checker.check_redis())
},
"version": "1.0.0"
}
Tracing Database Queries
from opentelemetry import trace
from sqlalchemy import event
from sqlalchemy.engine import Engine
tracer = trace.get_tracer(__name__)
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(
conn,
cursor,
statement,
parameters,
context,
executemany
):
with tracer.start_as_current_span(
"database_query",
attributes={
"db.statement": statement,
"db.parameters": str(parameters)
}
) as span:
return
Grafana Dashboard Configuration
{
"dashboard": {
"title": "FastAPI Application Metrics",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total[5m])",
"legendFormat": "{{method}} {{endpoint}}"
}
]
},
{
"title": "Response Latency",
"type": "heatmap",
"targets": [
{
"expr": "rate(http_request_duration_seconds_bucket[5m])",
"format": "heatmap"
}
]
},
{
"title": "Active Users",
"type": "stat",
"targets": [
{
"expr": "active_users_total"
}
]
}
]
}
}
Docker Compose Setup
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
depends_on:
- jaeger
- prometheus
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "4317:4317"
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
volumes:
grafana_data:
Best Practices
- Consistent Naming: Use consistent naming conventions for metrics, traces, and logs
- Cardinality Control: Be careful with high-cardinality labels in metrics
- Sampling: Implement appropriate sampling for traces in high-traffic systems
- Context Propagation: Ensure proper context propagation across service boundaries
- Alert Configuration: Set up meaningful alerts based on SLOs
- Documentation: Document the meaning and purpose of each metric and log field
Conclusion
A well-implemented observability stack is crucial for maintaining and troubleshooting modern applications. Key takeaways:
- Use OpenTelemetry for distributed tracing
- Implement Prometheus metrics for monitoring
- Set up structured logging
- Configure comprehensive health checks
- Use Grafana for visualization
- Follow best practices for maintainability
Remember that observability is not just about collecting data—it's about making that data actionable and useful for understanding your system's behavior.