Source code for apifrom.monitoring.metrics

"""
Metrics collection and management for APIFromAnything.

This module provides classes and utilities for collecting, managing, and
tracking various metrics related to API performance and usage.
"""

import time
import enum
import threading
import statistics
import uuid
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass, field


class MetricType(enum.Enum):
    """Types of metrics that can be collected."""
    COUNTER = "counter"  # Metrics that increment (e.g., request count)
    GAUGE = "gauge"  # Metrics that can go up and down (e.g., active requests)
    HISTOGRAM = "histogram"  # Distribution of values (e.g., response times)
    SUMMARY = "summary"  # Similar to histogram but with quantiles


@dataclass
[docs] class Metric: """Represents a single metric with its metadata and values.""" name: str type: MetricType description: str labels: Dict[str, str] = field(default_factory=dict) value: Union[float, int, List[float]] = field(default_factory=list) # For histogram/summary metrics buckets: List[float] = field(default_factory=list) quantiles: List[float] = field(default_factory=lambda: [0.5, 0.9, 0.95, 0.99]) def __post_init__(self): """Initialize the metric based on its type.""" if self.type in (MetricType.COUNTER, MetricType.GAUGE) and not isinstance(self.value, (int, float)): self.value = 0 elif self.type in (MetricType.HISTOGRAM, MetricType.SUMMARY) and not isinstance(self.value, list): self.value = [] # Set default buckets for histograms if not provided if self.type == MetricType.HISTOGRAM and not self.buckets: # Default buckets for response time in ms: 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000 self.buckets = [5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000] def increment(self, amount: float = 1.0, labels: Optional[Dict[str, str]] = None) -> None: """Increment a counter metric.""" if self.type != MetricType.COUNTER: raise ValueError(f"Cannot increment metric of type {self.type}") if labels: self.labels.update(labels) self.value += amount def set(self, value: float, labels: Optional[Dict[str, str]] = None) -> None: """Set a gauge metric to a specific value.""" if self.type != MetricType.GAUGE: raise ValueError(f"Cannot set value for metric of type {self.type}") if labels: self.labels.update(labels) self.value = value def observe(self, value: float, labels: Optional[Dict[str, str]] = None) -> None: """Add an observation to a histogram or summary metric.""" if self.type not in (MetricType.HISTOGRAM, MetricType.SUMMARY): raise ValueError(f"Cannot observe value for metric of type {self.type}") if labels: self.labels.update(labels) if isinstance(self.value, list): self.value.append(value) else: self.value = [value] def get_histogram_buckets(self) -> Dict[float, int]: """Get histogram bucket counts.""" if self.type != MetricType.HISTOGRAM: raise ValueError(f"Cannot get histogram buckets for metric of type {self.type}") result = {bucket: 0 for bucket in self.buckets} for value in self.value: for bucket in self.buckets: if value <= bucket: result[bucket] += 1 return result def get_summary_quantiles(self) -> Dict[float, float]: """Get summary quantiles.""" if self.type != MetricType.SUMMARY: raise ValueError(f"Cannot get summary quantiles for metric of type {self.type}") if not self.value: return {q: 0 for q in self.quantiles} result = {} sorted_values = sorted(self.value) for q in self.quantiles: idx = int(q * len(sorted_values)) if idx >= len(sorted_values): idx = len(sorted_values) - 1 result[q] = sorted_values[idx] return result def get_stats(self) -> Dict[str, float]: """Get basic statistics for histogram and summary metrics.""" if self.type not in (MetricType.HISTOGRAM, MetricType.SUMMARY): raise ValueError(f"Cannot get statistics for metric of type {self.type}") if not self.value: return { "count": 0, "sum": 0, "min": 0, "max": 0, "mean": 0, "median": 0 } return { "count": len(self.value), "sum": sum(self.value), "min": min(self.value), "max": max(self.value), "mean": statistics.mean(self.value), "median": statistics.median(self.value) }
[docs] class MetricsCollector: """ Collects and manages metrics for the API. This class provides a centralized way to create, update, and retrieve metrics for monitoring API performance and usage. """ _instance = None def __new__(cls): """Implement the singleton pattern.""" if cls._instance is None: cls._instance = super(MetricsCollector, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): """Initialize the metrics collector.""" # Only initialize once if getattr(self, '_initialized', False): return self._metrics = {} self._timers = {} self._initialized = True # Create default metrics self.create_counter("api_requests_total", "Total number of API requests") self.create_counter("api_errors_total", "Total number of API errors") self.create_gauge("api_requests_active", "Number of currently active API requests") self.create_histogram("api_request_duration_ms", "API request duration in milliseconds") self.create_counter("api_requests_by_endpoint", "API requests by endpoint") self.create_counter("api_errors_by_type", "API errors by type") def create_metric(self, metric: Metric) -> Metric: """ Register a new metric with the collector. Args: metric: The metric to register Returns: The registered metric """ # Check if metric already exists if metric.name in self._metrics: raise ValueError(f"Metric with name '{metric.name}' already exists") # Register the metric self._metrics[metric.name] = metric return metric def create_counter(self, name: str, description: str, labels: Optional[Dict[str, str]] = None) -> Metric: """Create a new counter metric.""" metric = Metric( name=name, type=MetricType.COUNTER, description=description, labels=labels or {}, value=0 ) return self.create_metric(metric) def create_gauge(self, name: str, description: str, labels: Optional[Dict[str, str]] = None) -> Metric: """Create a new gauge metric.""" metric = Metric( name=name, type=MetricType.GAUGE, description=description, labels=labels or {}, value=0 ) return self.create_metric(metric) def create_histogram(self, name: str, description: str, buckets: Optional[List[float]] = None, labels: Optional[Dict[str, str]] = None) -> Metric: """Create a new histogram metric.""" metric = Metric( name=name, type=MetricType.HISTOGRAM, description=description, labels=labels or {}, buckets=buckets or [] ) return self.create_metric(metric) def create_summary(self, name: str, description: str, quantiles: Optional[List[float]] = None, labels: Optional[Dict[str, str]] = None) -> Metric: """Create a new summary metric.""" metric = Metric( name=name, type=MetricType.SUMMARY, description=description, labels=labels or {}, quantiles=quantiles or [0.5, 0.9, 0.95, 0.99] ) return self.create_metric(metric) def get_metric(self, name: str) -> Optional[Metric]: """Get a metric by name.""" return self._metrics.get(name) def get_all_metrics(self) -> Dict[str, Metric]: """Get all metrics.""" return self._metrics.copy() def increment(self, name: str, amount: float = 1.0, labels: Optional[Dict[str, str]] = None) -> None: """Increment a counter metric.""" metric = self.get_metric(name) if not metric: raise ValueError(f"Metric with name '{name}' does not exist") metric.increment(amount, labels) def set(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None: """Set a gauge metric to a specific value.""" metric = self.get_metric(name) if not metric: raise ValueError(f"Metric with name '{name}' does not exist") metric.set(value, labels) def observe(self, name: str, value: float, labels: Optional[Dict[str, str]] = None) -> None: """Add an observation to a histogram or summary metric.""" metric = self.get_metric(name) if not metric: raise ValueError(f"Metric with name '{name}' does not exist") metric.observe(value, labels) def start_timer(self, name: str) -> str: """ Start a timer for measuring durations. Args: name: The name of the metric to record the duration Returns: A unique timer ID """ timer_id = f"{name}_{uuid.uuid4()}" self._timers[timer_id] = time.time() return timer_id def stop_timer(self, timer_id: str, labels: Optional[Dict[str, str]] = None) -> float: """ Stop a timer and return the duration in milliseconds. Args: timer_id: The timer ID labels: Optional labels to attach to the observation Returns: The duration in milliseconds """ if timer_id not in self._timers: raise ValueError(f"Timer with ID '{timer_id}' does not exist") start_time = self._timers.pop(timer_id) end_time = time.time() duration_ms = (end_time - start_time) * 1000 # Extract the metric name from the timer ID # The timer ID format is "metric_name_uuid" # We need to get everything before the last underscore parts = timer_id.split('_') if len(parts) >= 3: # At least metric_name_uuid # Join all parts except the last one (which is the UUID) name = '_'.join(parts[:-1]) else: name = parts[0] if parts else timer_id # Make sure the metric exists before observing if name in self._metrics: self.observe(name, duration_ms, labels) else: print(f"Warning: Metric with name '{name}' does not exist for timer {timer_id}") return duration_ms def track_request(self, endpoint: str) -> str: """ Track the start of an API request. Args: endpoint: The API endpoint being called Returns: A timer ID for tracking the request duration """ # Increment request counters self.increment("api_requests_total") self.increment("api_requests_by_endpoint", labels={"endpoint": endpoint}) # Increment active requests gauge if "api_requests_active" in self._metrics: self.set("api_requests_active", self.get_metric("api_requests_active").value + 1) # Start a timer for the request duration return self.start_timer("api_request_duration_ms") def track_request_end(self, timer_id: str, endpoint: str, status_code: int) -> float: """ Track the end of an API request. Args: timer_id: The timer ID returned by track_request endpoint: The API endpoint that was called status_code: The HTTP status code of the response Returns: The request duration in milliseconds """ # Stop the timer and record the duration duration_ms = self.stop_timer(timer_id, labels={"endpoint": endpoint, "status_code": str(status_code)}) # Decrement active requests gauge if "api_requests_active" in self._metrics: self.set("api_requests_active", max(0, self.get_metric("api_requests_active").value - 1)) # If it's an error response, increment the error counter if status_code >= 400: self.increment("api_errors_total") error_type = "client_error" if 400 <= status_code < 500 else "server_error" self.increment("api_errors_by_type", labels={"type": error_type, "status_code": str(status_code)}) return duration_ms def track_error(self, error_type: str, endpoint: Optional[str] = None) -> None: """Track an API error.""" self.increment("api_errors_total") labels = {"type": error_type} if endpoint: labels["endpoint"] = endpoint self.increment("api_errors_by_type", labels=labels) def reset(self) -> None: """Reset all metrics.""" self._metrics = {} self._timers = {} # Re-create default metrics self.create_counter("api_requests_total", "Total number of API requests") self.create_counter("api_errors_total", "Total number of API errors") self.create_gauge("api_requests_active", "Number of currently active API requests") self.create_histogram("api_request_duration_ms", "API request duration in milliseconds") self.create_counter("api_requests_by_endpoint", "API requests by endpoint") self.create_counter("api_errors_by_type", "API errors by type")