Performance Optimization in APIFromAnythingο
This guide covers the performance optimization features available in APIFromAnything and best practices for optimizing your APIs.
Introductionο
APIFromAnything includes comprehensive performance optimization features to help you build fast and efficient APIs. These features include:
Caching
Request coalescing
Batch processing
Connection pooling
Profiling
Auto-tuning
Async Support in Performance Optimizationο
As of version 1.0.0, all performance optimization features in APIFromAnything fully support async/await. This allows you to use async functions with all performance optimization features:
from apifrom import API, api
from apifrom.performance import Web
import asyncio
app = API()
@api(route="/products", method="GET")
@Web.optimize(
cache_ttl=30, # Cache for 30 seconds
profile=True, # Enable profiling
connection_pool=True, # Enable connection pooling
request_coalescing=True, # Enable request coalescing
batch_processing=True, # Enable batch processing
batch_size=100 # Set batch size
)
async def get_products():
# Simulate an async database query
await asyncio.sleep(0.1)
# Your code here (automatically optimized)
return {"products": [...]}
Web Decorator for Easy Optimizationο
The @Web.optimize decorator allows you to optimize any API endpoint with a single line of code:
from apifrom import API, api
from apifrom.performance import Web
app = API()
@api(route="/products", method="GET")
@Web.optimize(
cache_ttl=30, # Cache for 30 seconds
profile=True, # Enable profiling
connection_pool=True, # Enable connection pooling
request_coalescing=True, # Enable request coalescing
batch_processing=True, # Enable batch processing
batch_size=100 # Set batch size
)
def get_products():
# Your code here (automatically optimized)
return {"products": [...]}
Cachingο
Caching improves performance by storing the results of expensive operations and reusing them:
Basic Cachingο
from apifrom import API, api
from apifrom.performance import cache
app = API()
@api(route="/expensive-operation", method="GET")
@cache(ttl=60) # Cache for 60 seconds
def expensive_operation():
# Expensive operation
return {"result": "expensive computation"}
Advanced Cachingο
from apifrom import API, api
from apifrom.performance import cache
from apifrom.core.request import Request
app = API()
@api(route="/user/{user_id}", method="GET")
@cache(
ttl=300, # Cache for 5 minutes
key_builder=lambda request: f"user:{request.path_params['user_id']}", # Custom cache key
storage="redis", # Use Redis for storage
redis_url="redis://localhost:6379/0", # Redis URL
invalidate_on_update=True, # Invalidate cache on update
stale_while_revalidate=True, # Serve stale data while revalidating
stale_ttl=3600, # Stale data TTL in seconds
vary_by_headers=["Authorization"], # Vary cache by headers
vary_by_query_params=["version"], # Vary cache by query parameters
)
def get_user(request: Request, user_id: int):
# Expensive database query
return {"id": user_id, "name": "John Doe"}
Cache Invalidationο
from apifrom import API, api
from apifrom.performance import cache, invalidate_cache
app = API()
@api(route="/user/{user_id}", method="GET")
@cache(ttl=300, key_prefix="user")
def get_user(user_id: int):
# Expensive database query
return {"id": user_id, "name": "John Doe"}
@api(route="/user/{user_id}", method="PUT")
def update_user(user_id: int, name: str):
# Update user in database
# Invalidate cache
invalidate_cache(f"user:user:{user_id}")
return {"id": user_id, "name": name}
Cache Middlewareο
from apifrom import API
from apifrom.middleware import CacheMiddleware
app = API()
# Add cache middleware
app.add_middleware(
CacheMiddleware(
ttl=60, # Cache TTL in seconds
max_size=1000, # Maximum number of cached responses
key_builder=None, # Custom function to build cache keys
storage="memory", # Storage backend (memory, redis)
redis_url=None, # Redis URL (if using redis storage)
include_query_params=True, # Include query parameters in cache key
include_headers=False, # Include headers in cache key
cache_control=True, # Add Cache-Control headers to responses
)
)
Request Coalescingο
Request coalescing combines duplicate concurrent requests into a single backend operation:
from apifrom import API, api
from apifrom.performance import coalesce_requests
app = API()
@api(route="/popular-data", method="GET")
@coalesce_requests(
ttl=30, # Cache TTL in seconds
max_wait_time=0.05, # Maximum wait time in seconds
key_builder=lambda request: request.url.path, # Custom key builder
)
async def get_popular_data():
"""
This endpoint might receive many identical requests under load.
With request coalescing, only one database query will be executed
even if 100 users request this data at the same time.
"""
# Expensive database query or API call
return {"data": "expensive computation result"}
Batch Processingο
Batch processing improves performance by grouping multiple similar operations:
Batch Processing Decoratorο
from apifrom import API, api
from apifrom.performance import batch_process
app = API()
@api(route="/users", method="POST")
@batch_process(
max_batch_size=100, # Maximum batch size
max_wait_time=0.1, # Maximum wait time in seconds
key_builder=lambda request: request.url.path, # Custom key builder
)
async def create_users(user_data_batch):
"""
This endpoint will collect individual user creation requests and
process them in batches of up to 100 users or after 0.1 seconds.
"""
# Bulk insert all users in the batch
return db.bulk_insert_users(user_data_batch)
Ad-hoc Batch Operationsο
from apifrom import API, api
from apifrom.performance import BatchProcessor
from typing import List, Dict
app = API()
@api(route="/batch-operations", method="POST")
async def batch_operations(operations: List[Dict]):
"""Process multiple operations in a single request."""
# Define a function to process each operation
async def process_operation(op):
op_type = op["type"]
if op_type == "create":
return await db.create_item(op["data"])
elif op_type == "update":
return await db.update_item(op["id"], op["data"])
elif op_type == "delete":
return await db.delete_item(op["id"])
# Process all operations in efficient batches
results = await BatchProcessor.map(
process_operation,
operations,
batch_size=20, # Process in batches of 20
worker_count=4, # Use 4 workers
timeout=30, # Timeout after 30 seconds
)
return {"results": results}
Connection Poolingο
Connection pooling efficiently manages database and external service connections:
from apifrom import API, api
from apifrom.performance import ConnectionPool
import aiohttp
app = API()
# Create a connection pool
http_pool = ConnectionPool(
factory=lambda: aiohttp.ClientSession(),
min_size=5, # Minimum pool size
max_size=20, # Maximum pool size
max_idle_time=60, # Maximum idle time in seconds
cleanup_interval=300, # Cleanup interval in seconds
)
@api(route="/external-api", method="GET")
async def call_external_api(url: str):
# Get a connection from the pool
async with http_pool.acquire() as session:
# Use the connection
async with session.get(url) as response:
return await response.json()
Profilingο
Profiling helps you identify performance bottlenecks:
Profiling Decoratorο
from apifrom import API, api
from apifrom.performance import profile
app = API()
@api(route="/users", method="GET")
@profile(
enabled=True, # Enable profiling
log_level="INFO", # Log level
include_memory=True, # Include memory usage
include_sql=True, # Include SQL queries
include_http=True, # Include HTTP requests
threshold=100, # Log only if execution time exceeds threshold (ms)
)
def get_users():
# Your code here
return {"users": [...]}
Profiling Middlewareο
from apifrom import API
from apifrom.middleware import ProfileMiddleware
app = API()
# Add profiling middleware
app.add_middleware(
ProfileMiddleware(
enabled=True, # Enable profiling
log_level="INFO", # Log level
include_memory=True, # Include memory usage
include_sql=True, # Include SQL queries
include_http=True, # Include HTTP requests
threshold=100, # Log only if execution time exceeds threshold (ms)
)
)
Accessing Profiling Dataο
from apifrom.performance import Profiler
# Get profiling data
profiling_data = Profiler.get_data()
# Print profiling data
for endpoint, data in profiling_data.items():
print(f"Endpoint: {endpoint}")
print(f"Average response time: {data['avg_response_time']} ms")
print(f"Min response time: {data['min_response_time']} ms")
print(f"Max response time: {data['max_response_time']} ms")
print(f"Request count: {data['request_count']}")
print(f"Error count: {data['error_count']}")
print(f"Error rate: {data['error_rate']}%")
print(f"Memory usage: {data['memory_usage']} MB")
print(f"SQL queries: {data['sql_queries']}")
print(f"HTTP requests: {data['http_requests']}")
Auto-Tuningο
Auto-tuning automatically adjusts performance parameters based on real-time metrics:
from apifrom import API, api
from apifrom.performance import auto_tune
app = API()
@api(route="/users", method="GET")
@auto_tune(
enabled=True, # Enable auto-tuning
cache_ttl_range=(10, 3600), # Cache TTL range in seconds
batch_size_range=(10, 1000), # Batch size range
connection_pool_size_range=(5, 100), # Connection pool size range
target_response_time=100, # Target response time in ms
adjustment_interval=300, # Adjustment interval in seconds
learning_rate=0.1, # Learning rate
)
def get_users():
# Your code here
return {"users": [...]}
Asynchronous Programmingο
Asynchronous programming improves performance by allowing your API to handle multiple requests concurrently:
from apifrom import API, api
import asyncio
app = API()
@api(route="/users", method="GET")
async def get_users():
# Simulate an asynchronous database query
await asyncio.sleep(0.1)
return {"users": [...]}
@api(route="/products", method="GET")
async def get_products():
# Simulate an asynchronous database query
await asyncio.sleep(0.1)
return {"products": [...]}
@api(route="/dashboard", method="GET")
async def get_dashboard():
# Fetch users and products concurrently
users_task = asyncio.create_task(get_users())
products_task = asyncio.create_task(get_products())
# Wait for both tasks to complete
users, products = await asyncio.gather(users_task, products_task)
return {
"users": users["users"],
"products": products["products"]
}
Performance Best Practicesο
1. Use Cachingο
Cache expensive operations:
from apifrom import API, api
from apifrom.performance import cache
app = API()
@api(route="/expensive-operation", method="GET")
@cache(ttl=60) # Cache for 60 seconds
def expensive_operation():
# Expensive operation
return {"result": "expensive computation"}
2. Use Asynchronous Programmingο
Use asynchronous programming for I/O-bound operations:
from apifrom import API, api
import asyncio
app = API()
@api(route="/users", method="GET")
async def get_users():
# Asynchronous database query
await asyncio.sleep(0.1)
return {"users": [...]}
3. Use Connection Poolingο
Use connection pooling for database and external service connections:
from apifrom import API, api
from apifrom.performance import ConnectionPool
import aiohttp
app = API()
# Create a connection pool
http_pool = ConnectionPool(
factory=lambda: aiohttp.ClientSession(),
min_size=5,
max_size=20
)
@api(route="/external-api", method="GET")
async def call_external_api(url: str):
# Get a connection from the pool
async with http_pool.acquire() as session:
# Use the connection
async with session.get(url) as response:
return await response.json()
4. Use Batch Processingο
Use batch processing for multiple similar operations:
from apifrom import API, api
from apifrom.performance import batch_process
app = API()
@api(route="/users", method="POST")
@batch_process(max_batch_size=100, max_wait_time=0.1)
async def create_users(user_data_batch):
# Bulk insert all users in the batch
return db.bulk_insert_users(user_data_batch)
5. Use Request Coalescingο
Use request coalescing for popular endpoints:
from apifrom import API, api
from apifrom.performance import coalesce_requests
app = API()
@api(route="/popular-data", method="GET")
@coalesce_requests(ttl=30, max_wait_time=0.05)
async def get_popular_data():
# Expensive database query or API call
return {"data": "expensive computation result"}
6. Use Profilingο
Use profiling to identify performance bottlenecks:
from apifrom import API, api
from apifrom.performance import profile
app = API()
@api(route="/users", method="GET")
@profile(enabled=True)
def get_users():
# Your code here
return {"users": [...]}
7. Use Paginationο
Use pagination for large result sets:
from apifrom import API, api
from typing import List, Dict, Optional
app = API()
@api(route="/users", method="GET")
def get_users(page: int = 1, limit: int = 10) -> Dict:
"""Get a paginated list of users."""
# Calculate offset
offset = (page - 1) * limit
# Get users from database with pagination
users = db.get_users(offset=offset, limit=limit)
total = db.count_users()
return {
"users": users,
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit
}
8. Use Compressionο
Use compression to reduce response size:
from apifrom import API
from apifrom.middleware import CompressionMiddleware
app = API()
# Add compression middleware
app.add_middleware(
CompressionMiddleware(
minimum_size=1000, # Minimum response size to compress
level=9, # Compression level (1-9)
algorithms=["gzip", "deflate", "br"], # Supported compression algorithms
)
)
9. Use Database Optimizationο
Optimize database queries:
from apifrom import API, api
from apifrom.database import Database
app = API()
# Create a database connection
db = Database("sqlite:///app.db")
@api(route="/users", method="GET")
def get_users():
# Use optimized query with selected fields
users = db.query(
"SELECT id, name, email FROM users LIMIT 10",
use_cache=True, # Use query cache
cache_ttl=60, # Cache TTL in seconds
)
return {"users": users}
10. Use the Web Decoratorο
Use the Web decorator for easy optimization:
from apifrom import API, api
from apifrom.performance import Web
app = API()
@api(route="/products", method="GET")
@Web.optimize(
cache_ttl=30,
profile=True,
connection_pool=True,
request_coalescing=True,
batch_processing=True,
batch_size=100
)
def get_products():
# Your code here (automatically optimized)
return {"products": [...]}
Performance Monitoringο
APIFromAnything includes tools for monitoring performance:
Metrics Collectionο
from apifrom import API, api
from apifrom.monitoring import MetricsCollector, MetricsMiddleware, PrometheusExporter
# Create an API instance
app = API()
# Create a metrics collector
metrics = MetricsCollector()
# Register the metrics middleware
app.add_middleware(MetricsMiddleware(collector=metrics))
# Create a Prometheus exporter
prometheus_exporter = PrometheusExporter(collector=metrics)
# Define a metrics endpoint
@api(route="/metrics", method="GET")
def metrics():
"""Return Prometheus metrics."""
return prometheus_exporter.export()
Available Metricsο
APIFromAnything collects various metrics:
Request Counts: Total requests processed
Response Times: Distribution of response times
Status Codes: Count of responses by status code
Endpoint Usage: Most frequently called endpoints
Error Rates: Percentage of requests that result in errors
Custom Metrics: Define your own metrics to track
Conclusionο
APIFromAnything provides comprehensive performance optimization features to help you build fast and efficient APIs. By using these features and following best practices, you can ensure that your APIs perform well under load and provide a good user experience.