"""
Cache optimization utilities for APIFromAnything.
This module provides tools for optimizing cache performance in high-load scenarios,
including cache strategy optimization, monitoring, and auto-tuning.
"""
import time
import threading
import asyncio
from typing import Dict, List, Optional, Callable, Any, Union, Tuple
import logging
import json
import os
from datetime import datetime
import hashlib
from apifrom.core.request import Request
from apifrom.core.response import Response
from apifrom.middleware.base import BaseMiddleware
from apifrom.middleware.cache_advanced import CacheBackend
# Set up logging
logger = logging.getLogger("apifrom.performance.cache_optimizer")
[docs]
class CacheAnalytics:
"""
Collects and analyzes cache performance metrics.
This class collects cache performance metrics such as hit rates,
latency, and memory usage, and provides tools for analyzing
and visualizing this data.
"""
def __init__(self):
"""
Initialize cache analytics.
"""
self._lock = threading.Lock()
self.reset()
def reset(self):
"""
Reset all analytics data.
"""
with self._lock:
self.start_time = time.time()
self.requests = 0
self.cache_hits = 0
self.cache_misses = 0
self.cache_errors = 0
self.cache_size = 0
self.key_sizes = {}
self.value_sizes = {}
self.ttls = {}
self.hit_latencies = []
self.miss_latencies = []
def record_request(self):
"""
Record a cache request.
"""
with self._lock:
self.requests += 1
def record_hit(self, key: str, value_size: int, latency: float):
"""
Record a cache hit.
Args:
key: The cache key
value_size: The size of the cached value in bytes
latency: The cache lookup latency in seconds
"""
with self._lock:
self.cache_hits += 1
self.key_sizes[key] = len(key.encode('utf-8'))
self.value_sizes[key] = value_size
self.hit_latencies.append(latency * 1000) # Convert to ms
def record_miss(self, key: str, latency: float):
"""
Record a cache miss.
Args:
key: The cache key
latency: The cache lookup latency in seconds
"""
with self._lock:
self.cache_misses += 1
self.key_sizes[key] = len(key.encode('utf-8'))
self.miss_latencies.append(latency * 1000) # Convert to ms
def record_error(self, key: str):
"""
Record a cache error.
Args:
key: The cache key
"""
with self._lock:
self.cache_errors += 1
def record_set(self, key: str, value_size: int, ttl: int):
"""
Record a cache set operation.
Args:
key: The cache key
value_size: The size of the cached value in bytes
ttl: The TTL in seconds
"""
with self._lock:
self.key_sizes[key] = len(key.encode('utf-8'))
self.value_sizes[key] = value_size
self.ttls[key] = ttl
self.cache_size += value_size
def record_delete(self, key: str, value_size: int):
"""
Record a cache delete operation.
Args:
key: The cache key
value_size: The size of the cached value in bytes
"""
with self._lock:
if key in self.key_sizes:
del self.key_sizes[key]
if key in self.value_sizes:
del self.value_sizes[key]
if key in self.ttls:
del self.ttls[key]
self.cache_size -= value_size
@property
def hit_rate(self) -> float:
"""
Get the cache hit rate.
Returns:
The cache hit rate (0.0 to 1.0)
"""
with self._lock:
if self.requests == 0:
return 0.0
return self.cache_hits / self.requests
@property
def avg_hit_latency(self) -> float:
"""
Get the average cache hit latency in milliseconds.
Returns:
The average cache hit latency
"""
with self._lock:
if not self.hit_latencies:
return 0.0
return sum(self.hit_latencies) / len(self.hit_latencies)
@property
def avg_miss_latency(self) -> float:
"""
Get the average cache miss latency in milliseconds.
Returns:
The average cache miss latency
"""
with self._lock:
if not self.miss_latencies:
return 0.0
return sum(self.miss_latencies) / len(self.miss_latencies)
@property
def avg_key_size(self) -> float:
"""
Get the average key size in bytes.
Returns:
The average key size
"""
with self._lock:
if not self.key_sizes:
return 0.0
return sum(self.key_sizes.values()) / len(self.key_sizes)
@property
def avg_value_size(self) -> float:
"""
Get the average value size in bytes.
Returns:
The average value size
"""
with self._lock:
if not self.value_sizes:
return 0.0
return sum(self.value_sizes.values()) / len(self.value_sizes)
@property
def avg_ttl(self) -> float:
"""
Get the average TTL in seconds.
Returns:
The average TTL
"""
with self._lock:
if not self.ttls:
return 0.0
return sum(self.ttls.values()) / len(self.ttls)
def get_hot_keys(self, limit: int = 10) -> List[Tuple[str, int]]:
"""
Get the most frequently accessed cache keys.
Args:
limit: The maximum number of keys to return
Returns:
A list of (key, access_count) tuples
"""
# This is a placeholder; in a real implementation, you would track key access counts
return []
def get_large_keys(self, limit: int = 10) -> List[Tuple[str, int]]:
"""
Get the largest cache keys by value size.
Args:
limit: The maximum number of keys to return
Returns:
A list of (key, size) tuples
"""
with self._lock:
return sorted(self.value_sizes.items(), key=lambda x: x[1], reverse=True)[:limit]
def to_dict(self) -> Dict[str, Any]:
"""
Convert the analytics data to a dictionary.
Returns:
A dictionary representation of the analytics data
"""
with self._lock:
return {
"timestamp": datetime.now().isoformat(),
"duration_seconds": time.time() - self.start_time,
"requests": self.requests,
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"cache_errors": self.cache_errors,
"hit_rate": self.hit_rate,
"avg_hit_latency_ms": self.avg_hit_latency,
"avg_miss_latency_ms": self.avg_miss_latency,
"cache_size_bytes": self.cache_size,
"avg_key_size_bytes": self.avg_key_size,
"avg_value_size_bytes": self.avg_value_size,
"avg_ttl_seconds": self.avg_ttl,
"hot_keys": self.get_hot_keys(),
"large_keys": self.get_large_keys(),
}
def to_json(self, pretty: bool = True) -> str:
"""
Convert the analytics data to a JSON string.
Args:
pretty: Whether to format the JSON with indentation
Returns:
A JSON string representation of the analytics data
"""
indent = 2 if pretty else None
return json.dumps(self.to_dict(), indent=indent)
def save(self, file_path: str) -> None:
"""
Save the analytics data to a file.
Args:
file_path: The path to save the data to
"""
with open(file_path, 'w') as f:
f.write(self.to_json())
def print_summary(self) -> None:
"""
Print a summary of the analytics data to the console.
"""
print("=== Cache Analytics Summary ===")
print(f"Duration: {time.time() - self.start_time:.2f} seconds")
print(f"Total Requests: {self.requests}")
print(f"Cache Hits: {self.cache_hits}")
print(f"Cache Misses: {self.cache_misses}")
print(f"Cache Errors: {self.cache_errors}")
print(f"Hit Rate: {self.hit_rate:.2%}")
print(f"Average Hit Latency: {self.avg_hit_latency:.2f} ms")
print(f"Average Miss Latency: {self.avg_miss_latency:.2f} ms")
print(f"Cache Size: {self.cache_size / 1024:.2f} KB")
print(f"Average Key Size: {self.avg_key_size:.2f} bytes")
print(f"Average Value Size: {self.avg_value_size:.2f} bytes")
print(f"Average TTL: {self.avg_ttl:.2f} seconds")
print("\nTop 5 Largest Keys:")
for i, (key, size) in enumerate(self.get_large_keys(5)):
print(f"{i+1}. {key}: {size / 1024:.2f} KB")
[docs]
class OptimizedCacheStrategy:
"""
A cache strategy that optimizes caching based on runtime analytics.
This class provides cache optimization strategies such as TTL adjustment,
key compression, and value compression, based on runtime analytics.
"""
def __init__(self,
min_ttl: int = 60,
max_ttl: int = 86400,
hit_rate_threshold: float = 0.8,
compress_values: bool = True,
compress_keys: bool = False,
prefetch_keys: bool = False,
auto_tune: bool = True):
"""
Initialize an optimized cache strategy.
Args:
min_ttl: The minimum TTL in seconds
max_ttl: The maximum TTL in seconds
hit_rate_threshold: The hit rate threshold for TTL adjustment
compress_values: Whether to compress cached values
compress_keys: Whether to compress cache keys
prefetch_keys: Whether to prefetch related keys
auto_tune: Whether to auto-tune caching parameters
"""
self.min_ttl = min_ttl
self.max_ttl = max_ttl
self.hit_rate_threshold = hit_rate_threshold
self.compress_values = compress_values
self.compress_keys = compress_keys
self.prefetch_keys = prefetch_keys
self.auto_tune = auto_tune
self.key_ttls = {}
self.key_hit_counts = {}
self.key_access_times = {}
self.analytics = CacheAnalytics()
def get_optimized_ttl(self, key: str, default_ttl: int) -> int:
"""
Get an optimized TTL for a key based on access patterns.
Args:
key: The cache key
default_ttl: The default TTL in seconds
Returns:
The optimized TTL in seconds
"""
if not self.auto_tune:
return default_ttl
# If this is a new key, use the default TTL
if key not in self.key_hit_counts:
return default_ttl
hit_count = self.key_hit_counts.get(key, 0)
last_access = self.key_access_times.get(key, 0)
current_ttl = self.key_ttls.get(key, default_ttl)
# Adjust TTL based on hit count and recency
if hit_count > 10:
# Frequently accessed key, increase TTL
return min(current_ttl * 2, self.max_ttl)
elif time.time() - last_access > 3600:
# Infrequently accessed key, decrease TTL
return max(current_ttl // 2, self.min_ttl)
return current_ttl
def optimize_key(self, key: str) -> str:
"""
Optimize a cache key.
Args:
key: The original cache key
Returns:
The optimized cache key
"""
if not self.compress_keys:
return key
# Simple key compression using a hash
return hashlib.md5(key.encode('utf-8')).hexdigest()
def optimize_value(self, value: Any) -> Any:
"""
Optimize a cached value.
Args:
value: The original value
Returns:
The optimized value
"""
# Note: In a real implementation, you would compress the value
return value
def record_hit(self, key: str):
"""
Record a cache hit for a key.
Args:
key: The cache key
"""
self.key_hit_counts[key] = self.key_hit_counts.get(key, 0) + 1
self.key_access_times[key] = time.time()
def record_miss(self, key: str):
"""
Record a cache miss for a key.
Args:
key: The cache key
"""
# No action needed for misses in this implementation
pass
def record_set(self, key: str, ttl: int):
"""
Record a cache set operation.
Args:
key: The cache key
ttl: The TTL in seconds
"""
self.key_ttls[key] = ttl
self.key_access_times[key] = time.time()
[docs]
class CacheOptimizer:
"""
Optimizes caching for high-load scenarios.
This class provides tools for optimizing cache performance,
including strategy optimization, monitoring, and auto-tuning.
"""
def __init__(self,
cache_backend: CacheBackend,
strategy: Optional[OptimizedCacheStrategy] = None,
analytics: Optional[CacheAnalytics] = None,
auto_tune_interval: Optional[int] = None,
output_dir: Optional[str] = None):
"""
Initialize a cache optimizer.
Args:
cache_backend: The cache backend to optimize
strategy: The optimization strategy to use
analytics: The cache analytics instance to use
auto_tune_interval: The interval in seconds at which to auto-tune
output_dir: The directory to save analytics data to
"""
self.cache_backend = cache_backend
self.strategy = strategy or OptimizedCacheStrategy()
self.analytics = analytics or CacheAnalytics()
self.auto_tune_interval = auto_tune_interval
self.output_dir = output_dir
self._last_auto_tune = time.time()
self._lock = threading.Lock()
async def get(self, key: str) -> Optional[Any]:
"""
Get a value from the cache with optimization.
Args:
key: The cache key
Returns:
The cached value, or None if not found
"""
self.analytics.record_request()
# Optimize the key
optimized_key = self.strategy.optimize_key(key)
# Measure cache latency
start_time = time.time()
try:
# Get from cache
cached_value = await self.cache_backend.get(optimized_key)
# Record latency
latency = time.time() - start_time
if cached_value is None:
# Cache miss
self.analytics.record_miss(key, latency)
self.strategy.record_miss(key)
return None
# Cache hit
value_size = len(json.dumps(cached_value).encode('utf-8'))
self.analytics.record_hit(key, value_size, latency)
self.strategy.record_hit(key)
return cached_value
except Exception as e:
# Cache error
logger.warning(f"Cache error: {e}")
self.analytics.record_error(key)
return None
async def set(self, key: str, value: Any, ttl: int = 60) -> bool:
"""
Set a value in the cache with optimization.
Args:
key: The cache key
value: The value to cache
ttl: The TTL in seconds
Returns:
True if the value was cached, False otherwise
"""
# Optimize the key and value
optimized_key = self.strategy.optimize_key(key)
optimized_value = self.strategy.optimize_value(value)
# Optimize TTL
optimized_ttl = self.strategy.get_optimized_ttl(key, ttl)
try:
# Set in cache
success = await self.cache_backend.set(optimized_key, optimized_value, optimized_ttl)
if success:
# Record successful set
value_size = len(json.dumps(optimized_value).encode('utf-8'))
self.analytics.record_set(key, value_size, optimized_ttl)
self.strategy.record_set(key, optimized_ttl)
return success
except Exception as e:
# Cache error
logger.warning(f"Cache error: {e}")
self.analytics.record_error(key)
return False
async def delete(self, key: str) -> bool:
"""
Delete a value from the cache.
Args:
key: The cache key
Returns:
True if the value was deleted, False otherwise
"""
# Optimize the key
optimized_key = self.strategy.optimize_key(key)
try:
# Get the value size before deleting
cached_value = await self.cache_backend.get(optimized_key)
value_size = 0
if cached_value is not None:
value_size = len(json.dumps(cached_value).encode('utf-8'))
# Delete from cache
success = await self.cache_backend.delete(optimized_key)
if success and value_size > 0:
# Record successful delete
self.analytics.record_delete(key, value_size)
return success
except Exception as e:
# Cache error
logger.warning(f"Cache error: {e}")
self.analytics.record_error(key)
return False
def auto_tune(self) -> None:
"""
Auto-tune caching parameters based on analytics.
"""
# Check if auto-tuning is enabled and due
if (self.auto_tune_interval is None or
time.time() - self._last_auto_tune < self.auto_tune_interval):
return
logger.info("Auto-tuning cache parameters...")
# Get analytics data
hit_rate = self.analytics.hit_rate
avg_hit_latency = self.analytics.avg_hit_latency
avg_value_size = self.analytics.avg_value_size
# Adjust strategy parameters based on analytics
with self._lock:
# Adjust TTL based on hit rate
if hit_rate < 0.5:
# Low hit rate, decrease TTL
self.strategy.max_ttl = max(self.strategy.max_ttl // 2, self.strategy.min_ttl)
logger.info(f"Decreased max TTL to {self.strategy.max_ttl} seconds")
elif hit_rate > 0.9:
# High hit rate, increase TTL
self.strategy.max_ttl = min(self.strategy.max_ttl * 2, 86400 * 7) # Max 1 week
logger.info(f"Increased max TTL to {self.strategy.max_ttl} seconds")
# Enable value compression for large values
if avg_value_size > 10 * 1024: # 10 KB
self.strategy.compress_values = True
logger.info("Enabled value compression for large values")
# Enable key compression for slow cache lookups
if avg_hit_latency > 20: # 20 ms
self.strategy.compress_keys = True
logger.info("Enabled key compression for slow cache lookups")
# Save analytics data
if self.output_dir:
os.makedirs(self.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = os.path.join(self.output_dir, f"cache_analytics_{timestamp}.json")
self.analytics.save(file_path)
logger.info(f"Saved cache analytics to {file_path}")
# Reset auto-tune timer
self._last_auto_tune = time.time()
def get_analytics(self) -> CacheAnalytics:
"""
Get the cache analytics.
Returns:
The cache analytics instance
"""
return self.analytics
def reset_analytics(self) -> None:
"""
Reset the cache analytics.
"""
self.analytics.reset()
class OptimizedCacheMiddleware(BaseMiddleware):
"""
Middleware for adding optimized caching to an API.
This middleware adds optimized caching to an API, using the CacheOptimizer
to optimize caching strategies based on request patterns.
"""
def __init__(
self,
cache_backend=None,
ttl: int = 60,
auto_tune: bool = True,
auto_tune_interval: int = 300,
output_dir: Optional[str] = None,
compression: bool = True,
**options
):
"""
Initialize the optimized cache middleware.
Args:
cache_backend: The cache backend to use
ttl: Default time-to-live in seconds
auto_tune: Whether to auto-tune cache parameters
auto_tune_interval: The interval to auto-tune cache parameters in seconds
output_dir: The directory to save analytics to
compression: Whether to compress cached responses
**options: Additional options
"""
super().__init__(**options)
# Import necessary middleware here to avoid circular imports
try:
from apifrom.middleware.cache_advanced import (
MemoryCacheBackend,
AdvancedCacheMiddleware,
HybridEvictionPolicy
)
have_advanced_cache = True
except ImportError:
from apifrom.middleware.cache import CacheMiddleware
have_advanced_cache = False
# Create the cache backend if not provided
if cache_backend is None:
if have_advanced_cache:
self.cache_backend = MemoryCacheBackend(
eviction_policy=HybridEvictionPolicy()
)
else:
# Fall back to default cache implementation
self.cache_backend = {}
else:
self.cache_backend = cache_backend
# Create the cache middleware
if have_advanced_cache:
self.cache_middleware = AdvancedCacheMiddleware(
cache_backend=self.cache_backend,
ttl=ttl,
compress_responses=compression,
auto_vary=True,
**options
)
else:
self.cache_middleware = CacheMiddleware(
ttl=ttl,
**options
)
self.ttl = ttl
self.auto_tune = auto_tune
self.auto_tune_interval = auto_tune_interval
self.output_dir = output_dir
self.compression = compression
# Create the cache optimizer
self.optimizer = CacheOptimizer(
cache_backend=self.cache_backend,
output_dir=output_dir
)
# Analytics collection
self.analytics = CacheAnalytics()
# Auto-tuning
self.last_tune_time = time.time()
# Create the output directory if it doesn't exist
if output_dir:
os.makedirs(output_dir, exist_ok=True)
async def process_request(self, request):
"""
Process a request (required by BaseMiddleware).
Args:
request: The request to process
Returns:
The processed request
"""
return request
async def process_response(self, response):
"""
Process a response (required by BaseMiddleware).
Args:
response: The response to process
Returns:
The processed response
"""
return response
async def dispatch(
self,
request,
call_next
):
"""
Process a request through the middleware.
Args:
request: The request to process
call_next: The next middleware in the chain
Returns:
The response from the next middleware
"""
# Record the cache request
cache_key = self._generate_cache_key(request)
# Start the timer for cache operations
start_time = time.time()
# Check if the response is cached
cached_response = await self._get_from_cache(cache_key)
# If we have a cached response, return it
if cached_response is not None:
# Record analytics for cache hit
request_time = (time.time() - start_time) * 1000 # ms
self.analytics.record_hit(request_time)
return cached_response
# Record analytics for cache miss
miss_time = (time.time() - start_time) * 1000 # ms
self.analytics.record_miss(miss_time)
# Process the request normally
response = await call_next(request)
# Cache the response if it's cacheable
if self._is_cacheable(request, response):
await self._cache_response(cache_key, response)
# Auto-tune cache parameters if needed
if self.auto_tune and (time.time() - self.last_tune_time) > self.auto_tune_interval:
await self._auto_tune()
self.last_tune_time = time.time()
return response
def _generate_cache_key(self, request) -> str:
"""
Generate a cache key for a request.
Args:
request: The request to generate a key for
Returns:
The cache key
"""
# Get the URL path and query string
path = request.url.path
query = str(request.query_params)
method = request.method
# Generate a hash of the key components
key_components = f"{method}:{path}:{query}"
# Include headers that affect the response
for header in ["Accept", "Accept-Encoding", "Accept-Language"]:
if header in request.headers:
key_components += f":{header}={request.headers[header]}"
# Generate a hash of the key components
import hashlib
return hashlib.md5(key_components.encode()).hexdigest()
async def _get_from_cache(self, cache_key: str):
"""
Get a response from the cache.
Args:
cache_key: The cache key
Returns:
The cached response, or None if not found
"""
try:
# Get the cached response using the CacheMiddleware
if hasattr(self.cache_middleware, "get_cached_response"):
return await self.cache_middleware.get_cached_response(cache_key)
# Fall back to direct cache access
if hasattr(self.cache_backend, "get"):
cached_data = self.cache_backend.get(cache_key)
if cached_data:
from starlette.responses import JSONResponse
return JSONResponse(content=cached_data, headers={"X-Cache": "HIT"})
return None
except Exception as e:
logger.error(f"Error getting response from cache: {e}")
self.analytics.record_error()
return None
async def _cache_response(self, cache_key: str, response):
"""
Cache a response.
Args:
cache_key: The cache key
response: The response to cache
"""
try:
# Cache the response using the CacheMiddleware
if hasattr(self.cache_middleware, "cache_response"):
await self.cache_middleware.cache_response(cache_key, response, self.ttl)
return
# Fall back to direct cache access
if hasattr(self.cache_backend, "set"):
# Extract the response data
if hasattr(response, "body"):
import json
try:
data = json.loads(response.body)
self.cache_backend.set(cache_key, data, self.ttl)
except:
logger.error("Error parsing response body as JSON")
except Exception as e:
logger.error(f"Error caching response: {e}")
self.analytics.record_error()
def _is_cacheable(self, request, response) -> bool:
"""
Check if a response is cacheable.
Args:
request: The request
response: The response
Returns:
True if the response is cacheable, False otherwise
"""
# Check if the response is successful
if response.status_code < 200 or response.status_code >= 400:
return False
# Check if the method is cacheable
if request.method not in ["GET", "HEAD"]:
return False
# Check cache control headers
cache_control = response.headers.get("Cache-Control", "")
if "no-store" in cache_control or "no-cache" in cache_control:
return False
return True
async def _auto_tune(self):
"""Auto-tune cache parameters based on analytics."""
try:
# Skip auto-tuning if we don't have enough data
if self.analytics.request_count < 100:
return
# Get the current cache stats
stats = self.get_stats()
# Adjust TTL based on hit rate and request frequency
hit_rate = stats.get("hit_rate", 0)
avg_hit_latency = stats.get("avg_hit_latency_ms", 0)
avg_miss_latency = stats.get("avg_miss_latency_ms", 0)
new_ttl = self.ttl
if hit_rate < 0.3:
# Low hit rate, decrease TTL to refresh data more often
new_ttl = max(10, int(self.ttl * 0.8))
elif hit_rate > 0.7 and avg_hit_latency < avg_miss_latency * 0.1:
# High hit rate and significant performance benefit, increase TTL
new_ttl = min(3600, int(self.ttl * 1.2))
# Apply the new TTL if it's different
if new_ttl != self.ttl:
self.ttl = new_ttl
logger.info(f"Auto-tuned cache TTL to {new_ttl} seconds")
# Save analytics if output directory is specified
if self.output_dir:
self.save_analytics()
except Exception as e:
logger.error(f"Error during cache auto-tuning: {e}")
def get_analytics(self) -> CacheAnalytics:
"""
Get cache analytics.
Returns:
The cache analytics
"""
# Update the analytics with the current cache stats
try:
if hasattr(self.cache_backend, "get_stats"):
backend_stats = self.cache_backend.get_stats()
self.analytics.cache_size_bytes = backend_stats.get("total_size_bytes", 0)
self.analytics.item_count = backend_stats.get("item_count", 0)
if "large_keys" in backend_stats:
self.analytics.large_keys = backend_stats["large_keys"]
except Exception as e:
logger.error(f"Error getting cache stats: {e}")
return self.analytics
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
A dictionary of cache statistics
"""
analytics = self.get_analytics()
return analytics.to_dict()
def save_analytics(self) -> Optional[str]:
"""
Save cache analytics to a file.
Returns:
The path to the saved file, or None if saving failed
"""
if not self.output_dir:
return None
try:
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
filename = os.path.join(self.output_dir, f"cache_analytics_{timestamp}.json")
with open(filename, "w") as f:
json.dump(self.get_stats(), f, indent=2)
return filename
except Exception as e:
logger.error(f"Error saving cache analytics: {e}")
return None
def clear_cache(self):
"""Clear the cache."""
try:
if hasattr(self.cache_backend, "clear"):
self.cache_backend.clear()
self.analytics.clear()
logger.info("Cache cleared")
except Exception as e:
logger.error(f"Error clearing cache: {e}")