"""
Profiling utilities for APIFromAnything.
This module provides tools for profiling API endpoints, measuring performance,
and generating reports to help identify and fix performance bottlenecks.
"""
import time
import functools
import statistics
import cProfile
import pstats
import io
from typing import Dict, List, Optional, Callable, Any, Union
import json
import os
from datetime import datetime
import threading
import asyncio
from contextvars import ContextVar
from apifrom.core.request import Request
from apifrom.core.response import Response
from apifrom.middleware.base import BaseMiddleware
# Context variable to track profiler state
_profiler_ctx = ContextVar('profiler_ctx', default=None)
[docs]
class ProfileReport:
"""
Represents a performance profile report for an API endpoint.
This class provides methods for analyzing and visualizing profile data,
as well as exporting it to various formats.
"""
def __init__(self, endpoint_name: str, profile_data: Dict[str, Any]):
"""
Initialize a profile report.
Args:
endpoint_name: The name of the endpoint being profiled
profile_data: The raw profile data
"""
self.endpoint_name = endpoint_name
self.profile_data = profile_data
self.created_at = datetime.now()
@property
def avg_response_time(self) -> float:
"""
Get the average response time in milliseconds.
Returns:
The average response time
"""
if "response_times" in self.profile_data and self.profile_data["response_times"]:
return statistics.mean(self.profile_data["response_times"])
return 0.0
@property
def max_response_time(self) -> float:
"""
Get the maximum response time in milliseconds.
Returns:
The maximum response time
"""
if "response_times" in self.profile_data and self.profile_data["response_times"]:
return max(self.profile_data["response_times"])
return 0.0
@property
def min_response_time(self) -> float:
"""
Get the minimum response time in milliseconds.
Returns:
The minimum response time
"""
if "response_times" in self.profile_data and self.profile_data["response_times"]:
return min(self.profile_data["response_times"])
return 0.0
@property
def p95_response_time(self) -> float:
"""
Get the 95th percentile response time in milliseconds.
Returns:
The 95th percentile response time
"""
if "response_times" in self.profile_data and len(self.profile_data["response_times"]) > 0:
sorted_times = sorted(self.profile_data["response_times"])
idx = int(len(sorted_times) * 0.95)
return sorted_times[idx]
return 0.0
@property
def request_count(self) -> int:
"""
Get the number of requests processed.
Returns:
The number of requests
"""
return self.profile_data.get("request_count", 0)
def to_dict(self) -> Dict[str, Any]:
"""
Convert the report to a dictionary.
Returns:
A dictionary representation of the report
"""
return {
"endpoint": self.endpoint_name,
"created_at": self.created_at.isoformat(),
"avg_response_time_ms": self.avg_response_time,
"max_response_time_ms": self.max_response_time,
"min_response_time_ms": self.min_response_time,
"p95_response_time_ms": self.p95_response_time,
"request_count": self.request_count,
"function_stats": self.profile_data.get("function_stats", {}),
"memory_usage": self.profile_data.get("memory_usage", {}),
"cpu_usage": self.profile_data.get("cpu_usage", {}),
}
def to_json(self, pretty: bool = True) -> str:
"""
Convert the report to a JSON string.
Args:
pretty: Whether to format the JSON with indentation
Returns:
A JSON string representation of the report
"""
indent = 2 if pretty else None
return json.dumps(self.to_dict(), indent=indent)
def save(self, file_path: str) -> None:
"""
Save the report to a file.
Args:
file_path: The path to save the report to
"""
with open(file_path, 'w') as f:
f.write(self.to_json())
def print_summary(self) -> None:
"""
Print a summary of the report to the console.
"""
print(f"=== Profile Report: {self.endpoint_name} ===")
print(f"Created At: {self.created_at.isoformat()}")
print(f"Request Count: {self.request_count}")
print(f"Average Response Time: {self.avg_response_time:.2f} ms")
print(f"Maximum Response Time: {self.max_response_time:.2f} ms")
print(f"Minimum Response Time: {self.min_response_time:.2f} ms")
print(f"95th Percentile Response Time: {self.p95_response_time:.2f} ms")
if "function_stats" in self.profile_data and self.profile_data["function_stats"]:
print("\nTop 5 Functions by Cumulative Time:")
for i, (func, time_ms) in enumerate(sorted(
self.profile_data["function_stats"].items(),
key=lambda x: x[1],
reverse=True
)[:5]):
print(f"{i+1}. {func}: {time_ms:.2f} ms")
def get_recommendations(self) -> List[str]:
"""
Get performance optimization recommendations based on the profile data.
Returns:
A list of recommendations
"""
recommendations = []
# Check for slow response time
if self.avg_response_time > 200: # More than 200ms is considered slow
recommendations.append(
f"Average response time ({self.avg_response_time:.2f} ms) is high. "
f"Consider optimizing the endpoint implementation."
)
# Check for high p95
if self.p95_response_time > 500: # More than 500ms for p95 is concerning
recommendations.append(
f"95th percentile response time ({self.p95_response_time:.2f} ms) is high. "
f"There may be outliers or inconsistent performance."
)
# Check for high memory usage
if "memory_usage" in self.profile_data and self.profile_data["memory_usage"].get("peak_mb", 0) > 100:
recommendations.append(
f"Peak memory usage ({self.profile_data['memory_usage'].get('peak_mb', 0):.2f} MB) is high. "
f"Consider optimizing memory usage in the endpoint."
)
# Check for CPU-intensive functions
if "function_stats" in self.profile_data and self.profile_data["function_stats"]:
for func, time_ms in sorted(
self.profile_data["function_stats"].items(),
key=lambda x: x[1],
reverse=True
)[:3]:
if time_ms > 100: # More than 100ms is considered expensive
recommendations.append(
f"Function '{func}' is taking {time_ms:.2f} ms to execute. "
f"Consider optimizing this function or using caching."
)
# If everything looks good
if not recommendations:
recommendations.append(
f"The endpoint '{self.endpoint_name}' is performing well. "
f"No immediate optimizations needed."
)
return recommendations
[docs]
class APIProfiler:
"""
Profiles API endpoints to measure performance and identify bottlenecks.
This class provides tools for profiling API endpoints, measuring response times,
memory usage, and CPU usage, and generating profile reports.
"""
def __init__(self, output_dir: Optional[str] = None, enabled: bool = True):
"""
Initialize an API profiler.
Args:
output_dir: The directory to save profile reports to (defaults to current directory)
enabled: Whether profiling is enabled
"""
self.output_dir = output_dir or os.getcwd()
self.enabled = enabled
self.profiles = {}
self._lock = threading.Lock()
def profile_endpoint(self, endpoint_name: str) -> Callable:
"""
Decorator for profiling an API endpoint.
Args:
endpoint_name: The name of the endpoint being profiled
Returns:
A decorated function
"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if not self.enabled:
return await func(*args, **kwargs)
# Initialize profile data if needed
with self._lock:
if endpoint_name not in self.profiles:
self.profiles[endpoint_name] = {
"request_count": 0,
"response_times": [],
"function_stats": {},
"memory_usage": {},
"cpu_usage": {},
}
# Set up profiling
profiler = cProfile.Profile()
start_time = time.time()
profiler.enable()
# Store profiler in context
_profiler_ctx.set((self, endpoint_name, profiler, start_time))
try:
# Call the original function
result = await func(*args, **kwargs)
return result
finally:
# Get profiler from context
ctx_data = _profiler_ctx.get()
if ctx_data:
_, _, profiler, start_time = ctx_data
# End profiling
end_time = time.time()
profiler.disable()
# Update profile data
response_time = (end_time - start_time) * 1000 # Convert to ms
# Process profile data
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Top 10 functions
# Extract function stats
stats_output = s.getvalue()
function_stats = {}
for line in stats_output.split('\n')[5:15]: # Skip header lines
if line.strip():
parts = line.strip().split()
if len(parts) >= 6:
# Extract function name and cumulative time
func_name = ' '.join(parts[5:])
cum_time = float(parts[3]) * 1000 # Convert to ms
function_stats[func_name] = cum_time
# Update profile data
with self._lock:
self.profiles[endpoint_name]["request_count"] += 1
self.profiles[endpoint_name]["response_times"].append(response_time)
# Update function stats
for func_name, time_ms in function_stats.items():
if func_name in self.profiles[endpoint_name]["function_stats"]:
self.profiles[endpoint_name]["function_stats"][func_name] = (
(self.profiles[endpoint_name]["function_stats"][func_name] + time_ms) / 2
)
else:
self.profiles[endpoint_name]["function_stats"][func_name] = time_ms
# Clear context
_profiler_ctx.set(None)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if not self.enabled:
return func(*args, **kwargs)
# Initialize profile data if needed
with self._lock:
if endpoint_name not in self.profiles:
self.profiles[endpoint_name] = {
"request_count": 0,
"response_times": [],
"function_stats": {},
"memory_usage": {},
"cpu_usage": {},
}
# Set up profiling
profiler = cProfile.Profile()
start_time = time.time()
profiler.enable()
# Store profiler in context
_profiler_ctx.set((self, endpoint_name, profiler, start_time))
try:
# Call the original function
result = func(*args, **kwargs)
return result
finally:
# Get profiler from context
ctx_data = _profiler_ctx.get()
if ctx_data:
_, _, profiler, start_time = ctx_data
# End profiling
end_time = time.time()
profiler.disable()
# Update profile data
response_time = (end_time - start_time) * 1000 # Convert to ms
# Process profile data
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Top 10 functions
# Extract function stats
stats_output = s.getvalue()
function_stats = {}
for line in stats_output.split('\n')[5:15]: # Skip header lines
if line.strip():
parts = line.strip().split()
if len(parts) >= 6:
# Extract function name and cumulative time
func_name = ' '.join(parts[5:])
cum_time = float(parts[3]) * 1000 # Convert to ms
function_stats[func_name] = cum_time
# Update profile data
with self._lock:
self.profiles[endpoint_name]["request_count"] += 1
self.profiles[endpoint_name]["response_times"].append(response_time)
# Update function stats
for func_name, time_ms in function_stats.items():
if func_name in self.profiles[endpoint_name]["function_stats"]:
self.profiles[endpoint_name]["function_stats"][func_name] = (
(self.profiles[endpoint_name]["function_stats"][func_name] + time_ms) / 2
)
else:
self.profiles[endpoint_name]["function_stats"][func_name] = time_ms
# Clear context
_profiler_ctx.set(None)
# Return the appropriate wrapper based on whether the function is a coroutine
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
def get_report(self, endpoint_name: str) -> Optional[ProfileReport]:
"""
Get a profile report for an endpoint.
Args:
endpoint_name: The name of the endpoint
Returns:
A ProfileReport instance or None if no profile data exists
"""
if endpoint_name in self.profiles:
return ProfileReport(endpoint_name, self.profiles[endpoint_name])
return None
def get_all_reports(self) -> List[ProfileReport]:
"""
Get profile reports for all endpoints.
Returns:
A list of ProfileReport instances
"""
return [ProfileReport(name, data) for name, data in self.profiles.items()]
def save_reports(self, prefix: Optional[str] = None) -> List[str]:
"""
Save all profile reports to files.
Args:
prefix: A prefix to add to the filenames
Returns:
A list of file paths where reports were saved
"""
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
prefix = prefix or f"profile_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
file_paths = []
for name, data in self.profiles.items():
report = ProfileReport(name, data)
# Create a safe filename from the endpoint name
safe_name = name.replace('/', '_').replace('\\', '_').replace('.', '_')
file_path = os.path.join(self.output_dir, f"{prefix}_{safe_name}.json")
report.save(file_path)
file_paths.append(file_path)
return file_paths
def clear(self) -> None:
"""
Clear all profile data.
"""
with self._lock:
self.profiles = {}
def enable(self) -> None:
"""
Enable profiling.
"""
self.enabled = True
def disable(self) -> None:
"""
Disable profiling.
"""
self.enabled = False
class ProfileMiddleware(BaseMiddleware):
"""
Middleware for profiling API requests and responses.
This middleware profiles API requests and responses, collecting performance metrics
and generating profile reports. It can be used to identify performance bottlenecks
and optimize API performance.
"""
def __init__(
self,
output_dir: Optional[str] = None,
save_interval: int = 300,
enabled: bool = True,
profiler: Optional[APIProfiler] = None,
):
"""
Initialize the profile middleware.
Args:
output_dir: The directory to save profile reports to
save_interval: The interval to save profile reports in seconds
enabled: Whether profiling is enabled
profiler: The profiler to use (creates a new one if None)
"""
super().__init__()
self.output_dir = output_dir
self.save_interval = save_interval
self.enabled = enabled
self.profiler = profiler or APIProfiler(output_dir=output_dir)
self.last_save_time = time.time()
async def process_request(self, request):
"""
Process a request (required by BaseMiddleware).
Args:
request: The request to process
Returns:
The processed request
"""
# Initialize request context for profiling
if self.enabled:
request.state.profile_start_time = time.time()
return request
async def process_response(self, response):
"""
Process a response (required by BaseMiddleware).
Args:
response: The response to process
Returns:
The processed response
"""
# Nothing to do here, profiling is handled in dispatch
return response
async def dispatch(
self,
request: Request,
call_next: Callable
) -> Response:
"""
Dispatch a request, profiling the execution time.
Args:
request: The request to process
call_next: The next middleware or route handler
Returns:
The response
"""
# Skip profiling if disabled
if not self.enabled:
return await call_next(request)
# Get the endpoint and path
endpoint = f"{request.method}:{request.url.path}"
# Start profiling
profile_id = self.profiler.start_profile(endpoint)
start_time = time.time()
try:
# Call the next middleware or route handler
response = await call_next(request)
# Record the successful response
end_time = time.time()
self.profiler.end_profile(
profile_id=profile_id,
status_code=response.status_code,
duration_ms=(end_time - start_time) * 1000,
endpoint=endpoint,
response_size=len(response.body) if hasattr(response, "body") else 0,
is_error=response.status_code >= 400,
)
# Save profile reports if the save interval has passed
if self.output_dir and (time.time() - self.last_save_time) > self.save_interval:
self.profiler.save_profile_reports()
self.last_save_time = time.time()
return response
except Exception as e:
# Record the error
end_time = time.time()
self.profiler.end_profile(
profile_id=profile_id,
status_code=500,
duration_ms=(end_time - start_time) * 1000,
endpoint=endpoint,
response_size=0,
is_error=True,
error=str(e),
)
# Re-raise the exception
raise
def get_all_reports(self) -> List[ProfileReport]:
"""
Get all profile reports.
Returns:
A list of profile reports
"""
return self.profiler.get_all_reports()
def get_report(self, endpoint: str) -> Optional[ProfileReport]:
"""
Get a profile report for an endpoint.
Args:
endpoint: The endpoint to get the report for
Returns:
The profile report, or None if not found
"""
return self.profiler.get_report(endpoint)
def clear(self) -> None:
"""Clear all profile data."""
self.profiler.clear()
def enable(self) -> None:
"""Enable profiling."""
self.enabled = True
def disable(self) -> None:
"""Disable profiling."""
self.enabled = False