apifrom.performance.batch_processing

Batch processing utilities for APIFromAnything.

This module provides tools for grouping multiple operations together and processing them in bulk for better efficiency, reducing overhead and improving throughput.

Overview

Classes

Functions

Classes

BatchCollector(max_batch_size = 100, max_wait_time = 0.1, process_func = None, auto_process = True):bases: Generic[T]

Collects items for batch processing.

This class collects items until a batch size or timeout is reached, then processes them in a batch.

Initialize a batch collector.

param max_batch_size:

The maximum number of items in a batch

param max_wait_time:

The maximum time to wait for a batch to fill in seconds

param process_func:

A function to process batches

param auto_process:

Whether to automatically process batches when filled

apifrom.performance.batch_processing._process_batch()
:async:

Process the current batch.

apifrom.performance.batch_processing._start_timer()

Start a timer to process the batch after max_wait_time seconds.

apifrom.performance.batch_processing.add_item(item, callback=None)
:async:

Add an item to the batch and return a future that will resolve when the item is processed.

param item:

The item to add to the batch

param callback:

A callback function to call with the batch results

returns:

A future that will resolve when the item is processed

apifrom.performance.batch_processing.get_stats()

Get batch processing statistics.

returns:

A dictionary of statistics

apifrom.performance.batch_processing.process_current_batch()
:async:

Process the current batch immediately.

apifrom.performance.batch_processing.reset_stats()

Reset batch processing statistics.

apifrom.performance.batch_processing.wait_for_empty()
:async:

Wait until the batch collector is empty.

class apifrom.performance.batch_processing.BatchExecutor[source]

Executes batched operations.

This class provides methods for executing batched operations with optimal batch sizes and parallelism.

apifrom.performance.batch_processing.execute_batch(batch_func, items, batch_size=100, parallel=False, max_workers=5)
:staticmethod:
:async:

Execute a batch function on a list of items.

param batch_func:

A function that processes a batch of items

param items:

The items to process

param batch_size:

The maximum number of items to process in a single batch

param parallel:

Whether to process batches in parallel

param max_workers:

The maximum number of parallel workers

returns:

A list of results

apifrom.performance.batch_processing.map(func, items, batch_size=100, worker_count=5)
:staticmethod:
:async:

Map a function over a list of items with batching and parallelism.

param func:

A function to apply to each item

param items:

The items to process

param batch_size:

The maximum number of items to process in a single batch

param worker_count:

The maximum number of parallel workers

returns:

A list of results

apifrom.performance.batch_processing.reduce(func, items, initial, batch_size=100)
:staticmethod:
:async:

Reduce a list of items with a function.

param func:

A reduction function

param items:

The items to reduce

param initial:

The initial value

param batch_size:

The maximum number of items to process in a single batch

returns:

The reduced result

class apifrom.performance.batch_processing.BatchProcessor(batch_size=100, max_wait_time=0.1, process_func=None, auto_process=True, auto_setup=True)[source]
Parameters:
  • batch_size (int)

  • max_wait_time (float)

  • process_func (Optional[BatchFunction])

  • auto_process (bool)

  • auto_setup (bool)

Processes batches of similar operations.

This class provides a higher-level interface for batch processing, including batch collection, execution, and statistics.

Initialize a batch processor.

param batch_size:

The maximum number of items in a batch

param max_wait_time:

The maximum time to wait for a batch to fill in seconds

param process_func:

A function to process batches

param auto_process:

Whether to automatically process batches when filled

param auto_setup:

Whether to automatically set up the processor when initialized

apifrom.performance.batch_processing._get_or_create_collector(key)
:async:

Get or create a batch collector.

param key:

The collector key

returns:

A batch collector

apifrom.performance.batch_processing.force_process(collector_key='default')
:async:

Force processing of the current batch.

param collector_key:

The key for the batch collector

apifrom.performance.batch_processing.get_all_stats()

Get batch processing statistics for all collectors.

returns:

A dictionary of statistics by collector key

apifrom.performance.batch_processing.get_stats(collector_key='default')

Get batch processing statistics.

param collector_key:

The key for the batch collector

returns:

A dictionary of statistics

apifrom.performance.batch_processing.map(func, items, batch_size=100, worker_count=5)
:staticmethod:
:async:

Map a function over a list of items with batching and parallelism.

param func:

A function to apply to each item

param items:

The items to process

param batch_size:

The maximum number of items to process in a single batch

param worker_count:

The maximum number of parallel workers

returns:

A list of results

apifrom.performance.batch_processing.process(item, collector_key='default', callback=None)
:async:

Process an item through batch processing.

param item:

The item to process

param collector_key:

The key for the batch collector

param callback:

A callback function to call with the batch results

returns:

The result of processing the item

apifrom.performance.batch_processing.process_batch(items, collector_key='default')
:async:

Process a batch of items.

param items:

The items to process

param collector_key:

The key for the batch collector

returns:

The results of processing the items

apifrom.performance.batch_processing.reset_all_stats()

Reset batch processing statistics for all collectors.

apifrom.performance.batch_processing.reset_stats(collector_key='default')

Reset batch processing statistics.

param collector_key:

The key for the batch collector

apifrom.performance.batch_processing.wait_for_all_empty()
:async:

Wait until all batch collectors are empty.

Middleware(app, dispatch = None):bases: starlette.middleware.base.BaseHTTPMiddleware, BaseMiddleware

Middleware class for APIFromAnything.

This class implements the BaseMiddleware interface and extends Starlette’s BaseHTTPMiddleware to provide a middleware component that can be used with Starlette.

Initialize a new BaseMiddleware instance.

param **options:

Options for the middleware.

class apifrom.performance.batch_processing.Request(request=None, path_params=None, method=None, path=None, query_params=None, headers=None, body=None, client_ip=None)[source]
Parameters:
  • request (Optional[starlette.requests.Request])

  • path_params (Optional[dict[Any, Any]])

  • method (Optional[str])

  • path (Optional[str])

  • query_params (Optional[dict[Any, Any]])

  • headers (Optional[dict[Any, Any]])

  • body (Optional[Union[str, bytes]])

  • client_ip (Optional[str])

Request class for APIFromAnything.

This class wraps a Starlette request and provides methods for accessing request data in a convenient way.

apifrom.performance.batch_processing._request

The underlying Starlette request.

apifrom.performance.batch_processing.path_params

Path parameters extracted from the URL.

apifrom.performance.batch_processing.query_params

Query parameters extracted from the URL.

apifrom.performance.batch_processing.headers

HTTP headers.

apifrom.performance.batch_processing.method

HTTP method.

apifrom.performance.batch_processing.path

Request path.

apifrom.performance.batch_processing._body

Cached request body.

Initialize a new Request instance.

param request:

The underlying Starlette request.

param path_params:

Path parameters extracted from the URL.

param method:

The HTTP method.

param path:

The request path.

param query_params:

Query parameters.

param headers:

HTTP headers.

param body:

Request body.

param client_ip:

Client IP address.

class apifrom.performance.batch_processing.Response(content=None, status_code=200, headers=None, content_type='application/json')[source]
Parameters:
  • content (Any)

  • status_code (int)

  • headers (Optional[Dict[str, str]])

  • content_type (str)

Response class for APIFromAnything.

This class represents an HTTP response and provides methods for setting response data, status code, and headers.

apifrom.performance.batch_processing.content

The response content.

apifrom.performance.batch_processing.status_code

The HTTP status code.

apifrom.performance.batch_processing.headers

HTTP headers.

apifrom.performance.batch_processing.content_type

The content type of the response.

Initialize a new Response instance.

param content:

The response content.

param status_code:

The HTTP status code.

param headers:

HTTP headers.

param content_type:

The content type of the response.

Functions

apifrom.performance.batch_processing.batch_process(batch_size=100, max_wait_time=0.1, process_func=None, auto_process=True)[source]
Parameters:
  • batch_size (int)

  • max_wait_time (float)

  • process_func (Optional[BatchFunction])

  • auto_process (bool)

Decorator for batch processing.

This decorator groups calls to a function into batches and processes them together.

param batch_size:

The maximum number of items in a batch

param max_wait_time:

The maximum time to wait for a batch to fill in seconds

param process_func:

A function to process batches

param auto_process:

Whether to automatically process batches when filled

returns:

A decorator function