apifrom.performance.batch_processing ==================================== Batch processing utilities for APIFromAnything. This module provides tools for grouping multiple operations together and processing them in bulk for better efficiency, reducing overhead and improving throughput. .. py:currentmodule:: apifrom.performance.batch_processing Overview -------- **Classes** * :py:class:`BatchCollector` * :py:class:`BatchExecutor` * :py:class:`BatchProcessor` * :py:class:`Middleware` * :py:class:`Request` * :py:class:`Response` **Functions** * :py:func:`batch_process` Classes ------- .. py:class:: BatchCollector(max_batch_size = 100, max_wait_time = 0.1, process_func = None, auto_process = True):bases: Generic[T] Collects items for batch processing. This class collects items until a batch size or timeout is reached, then processes them in a batch. Initialize a batch collector. :param max_batch_size: The maximum number of items in a batch :param max_wait_time: The maximum time to wait for a batch to fill in seconds :param process_func: A function to process batches :param auto_process: Whether to automatically process batches when filled .. :: _batch :annotation: List[Tuple[T, asyncio.Future, Optional[Callable]]] .. :: _batch_lock .. :: _stats .. :: _stats_lock .. :: _timer_task :annotation: Optional[asyncio.Task] .. :: auto_process .. :: max_batch_size .. :: max_wait_time .. :: process_func .. method:: _process_batch() :async: Process the current batch. .. method:: _start_timer() Start a timer to process the batch after max_wait_time seconds. .. method:: add_item(item, callback = None) :async: Add an item to the batch and return a future that will resolve when the item is processed. :param item: The item to add to the batch :param callback: A callback function to call with the batch results :returns: A future that will resolve when the item is processed .. method:: get_stats() Get batch processing statistics. :returns: A dictionary of statistics .. method:: process_current_batch() :async: Process the current batch immediately. .. method:: reset_stats() Reset batch processing statistics. .. method:: wait_for_empty() :async: Wait until the batch collector is empty. .. py:class:: BatchExecutor Executes batched operations. This class provides methods for executing batched operations with optimal batch sizes and parallelism. .. method:: execute_batch(batch_func, items, batch_size = 100, parallel = False, max_workers = 5) :staticmethod: :async: Execute a batch function on a list of items. :param batch_func: A function that processes a batch of items :param items: The items to process :param batch_size: The maximum number of items to process in a single batch :param parallel: Whether to process batches in parallel :param max_workers: The maximum number of parallel workers :returns: A list of results .. method:: map(func, items, batch_size = 100, worker_count = 5) :staticmethod: :async: Map a function over a list of items with batching and parallelism. :param func: A function to apply to each item :param items: The items to process :param batch_size: The maximum number of items to process in a single batch :param worker_count: The maximum number of parallel workers :returns: A list of results .. method:: reduce(func, items, initial, batch_size = 100) :staticmethod: :async: Reduce a list of items with a function. :param func: A reduction function :param items: The items to reduce :param initial: The initial value :param batch_size: The maximum number of items to process in a single batch :returns: The reduced result .. py:class:: BatchProcessor(batch_size = 100, max_wait_time = 0.1, process_func = None, auto_process = True, auto_setup = True) Processes batches of similar operations. This class provides a higher-level interface for batch processing, including batch collection, execution, and statistics. Initialize a batch processor. :param batch_size: The maximum number of items in a batch :param max_wait_time: The maximum time to wait for a batch to fill in seconds :param process_func: A function to process batches :param auto_process: Whether to automatically process batches when filled :param auto_setup: Whether to automatically set up the processor when initialized .. :: _collectors :annotation: Dict[str, BatchCollector] .. :: _collectors_lock .. :: auto_process .. :: max_batch_size .. :: max_wait_time .. :: process_func .. method:: _get_or_create_collector(key) :async: Get or create a batch collector. :param key: The collector key :returns: A batch collector .. method:: force_process(collector_key = 'default') :async: Force processing of the current batch. :param collector_key: The key for the batch collector .. method:: get_all_stats() Get batch processing statistics for all collectors. :returns: A dictionary of statistics by collector key .. method:: get_stats(collector_key = 'default') Get batch processing statistics. :param collector_key: The key for the batch collector :returns: A dictionary of statistics .. method:: map(func, items, batch_size = 100, worker_count = 5) :staticmethod: :async: Map a function over a list of items with batching and parallelism. :param func: A function to apply to each item :param items: The items to process :param batch_size: The maximum number of items to process in a single batch :param worker_count: The maximum number of parallel workers :returns: A list of results .. method:: process(item, collector_key = 'default', callback = None) :async: Process an item through batch processing. :param item: The item to process :param collector_key: The key for the batch collector :param callback: A callback function to call with the batch results :returns: The result of processing the item .. method:: process_batch(items, collector_key = 'default') :async: Process a batch of items. :param items: The items to process :param collector_key: The key for the batch collector :returns: The results of processing the items .. method:: reset_all_stats() Reset batch processing statistics for all collectors. .. method:: reset_stats(collector_key = 'default') Reset batch processing statistics. :param collector_key: The key for the batch collector .. method:: wait_for_all_empty() :async: Wait until all batch collectors are empty. .. py:class:: Middleware(app, dispatch = None):bases: starlette.middleware.base.BaseHTTPMiddleware, BaseMiddleware Middleware class for APIFromAnything. This class implements the BaseMiddleware interface and extends Starlette's BaseHTTPMiddleware to provide a middleware component that can be used with Starlette. Initialize a new BaseMiddleware instance. :param \*\*options: Options for the middleware. .. py:class:: Request(request = None, path_params = None, method = None, path = None, query_params = None, headers = None, body = None, client_ip = None) Request class for APIFromAnything. This class wraps a Starlette request and provides methods for accessing request data in a convenient way. .. attribute:: _request The underlying Starlette request. .. attribute:: path_params Path parameters extracted from the URL. .. attribute:: query_params Query parameters extracted from the URL. .. attribute:: headers HTTP headers. .. attribute:: method HTTP method. .. attribute:: path Request path. .. attribute:: _body Cached request body. Initialize a new Request instance. :param request: The underlying Starlette request. :param path_params: Path parameters extracted from the URL. :param method: The HTTP method. :param path: The request path. :param query_params: Query parameters. :param headers: HTTP headers. :param body: Request body. :param client_ip: Client IP address. .. py:class:: Response(content = None, status_code = 200, headers = None, content_type = 'application/json') Response class for APIFromAnything. This class represents an HTTP response and provides methods for setting response data, status code, and headers. .. attribute:: content The response content. .. attribute:: status_code The HTTP status code. .. attribute:: headers HTTP headers. .. attribute:: content_type The content type of the response. Initialize a new Response instance. :param content: The response content. :param status_code: The HTTP status code. :param headers: HTTP headers. :param content_type: The content type of the response. Functions --------- .. py:function:: batch_process(batch_size = 100, max_wait_time = 0.1, process_func = None, auto_process = True) Decorator for batch processing. This decorator groups calls to a function into batches and processes them together. :param batch_size: The maximum number of items in a batch :param max_wait_time: The maximum time to wait for a batch to fill in seconds :param process_func: A function to process batches :param auto_process: Whether to automatically process batches when filled :returns: A decorator function