"""
OpenAPI documentation generator for APIFromAnything.
This module provides functionality to generate OpenAPI/Swagger documentation
for APIs created with the APIFromAnything library. It supports comprehensive
customization options and follows the OpenAPI 3.0 specification.
"""
import inspect
import json
import re
from typing import Any, Dict, List, Optional, Type, Union, get_type_hints, Callable
from apifrom.core.router import Router
from apifrom.utils.type_utils import (
get_args,
get_origin,
is_optional_type,
extract_optional_type,
is_list_type,
is_dict_type,
is_union_type,
get_union_types,
)
[docs]
class OpenAPIConfig:
"""
Configuration for OpenAPI documentation generation.
This class provides configuration options for customizing the
OpenAPI documentation generation process.
"""
def __init__(
self,
title: str,
description: str,
version: str,
terms_of_service: Optional[str] = None,
contact: Optional[Dict[str, str]] = None,
license_info: Optional[Dict[str, str]] = None,
servers: Optional[List[Dict[str, str]]] = None,
external_docs: Optional[Dict[str, str]] = None,
tags: Optional[List[Dict[str, Any]]] = None,
security: Optional[List[Dict[str, List[str]]]] = None,
components: Optional[Dict[str, Any]] = None,
):
"""
Initialize the OpenAPI configuration.
Args:
title: The title of the API
description: The description of the API
version: The version of the API
terms_of_service: URL to the terms of service
contact: Contact information (name, url, email)
license_info: License information (name, url)
servers: Server information (url, description, variables)
external_docs: External documentation (description, url)
tags: API tags with descriptions
security: Global security requirements
components: Pre-defined components (schemas, responses, parameters, etc.)
"""
self.title = title
self.description = description
self.version = version
self.terms_of_service = terms_of_service
self.contact = contact
self.license_info = license_info
self.servers = servers or [{"url": "/"}]
self.external_docs = external_docs
self.tags = tags or []
self.security = security
self.components = components or {}
[docs]
class OpenAPIGenerator:
"""
Generator for OpenAPI documentation.
This class generates OpenAPI/Swagger documentation for APIs created with
the APIFromAnything library, with extensive customization options.
"""
def __init__(
self,
title: str,
description: str,
version: str,
router: Router,
config: Optional[OpenAPIConfig] = None,
):
"""
Initialize the OpenAPI generator.
Args:
title: The title of the API
description: The description of the API
version: The version of the API
router: The router instance containing the API routes
config: Additional configuration options
"""
self.router = router
self.config = config or OpenAPIConfig(title, description, version)
self.custom_schemas: Dict[str, Dict[str, Any]] = {}
self.custom_responses: Dict[str, Dict[str, Any]] = {}
self.custom_parameters: Dict[str, Dict[str, Any]] = {}
self.custom_examples: Dict[str, Dict[str, Any]] = {}
self.custom_request_bodies: Dict[str, Dict[str, Any]] = {}
self.custom_headers: Dict[str, Dict[str, Any]] = {}
self.custom_security_schemes: Dict[str, Dict[str, Any]] = {}
self.custom_links: Dict[str, Dict[str, Any]] = {}
self.custom_callbacks: Dict[str, Dict[str, Any]] = {}
def generate(self) -> Dict[str, Any]:
"""
Generate the OpenAPI documentation.
Returns:
The OpenAPI documentation as a dictionary
"""
openapi_doc = {
"openapi": "3.0.0",
"info": self._generate_info(),
"servers": self.config.servers,
"paths": self._generate_paths(),
"components": self._generate_components(),
}
# Add tags if defined
if self.config.tags:
openapi_doc["tags"] = self.config.tags
# Add external docs if defined
if self.config.external_docs:
openapi_doc["externalDocs"] = self.config.external_docs
# Add global security requirements if defined
if self.config.security:
openapi_doc["security"] = self.config.security
return openapi_doc
def _generate_info(self) -> Dict[str, Any]:
"""
Generate the info section of the OpenAPI documentation.
Returns:
The info section as a dictionary
"""
info = {
"title": self.config.title,
"description": self.config.description,
"version": self.config.version,
}
# Add terms of service if defined
if self.config.terms_of_service:
info["termsOfService"] = self.config.terms_of_service
# Add contact information if defined
if self.config.contact:
info["contact"] = self.config.contact
# Add license information if defined
if self.config.license_info:
info["license"] = self.config.license_info
return info
def _generate_paths(self) -> Dict[str, Any]:
"""
Generate the paths section of the OpenAPI documentation.
Returns:
The paths section as a dictionary
"""
paths = {}
for route_path, route_info in self.router.routes.items():
path_item = {}
for method, handler_info in route_info.items():
handler = handler_info["handler"]
operation = self._generate_operation(handler, method, route_path)
# Add security requirements if applicable
security_requirements = self._get_security_requirements(handler)
if security_requirements:
operation["security"] = security_requirements
path_item[method.lower()] = operation
# Convert path parameters from {param} to {param} format if needed
openapi_path = route_path
paths[openapi_path] = path_item
return paths
def _generate_operation(self, handler, method: str, path: str) -> Dict[str, Any]:
"""
Generate an operation object for the OpenAPI documentation.
Args:
handler: The handler function
method: The HTTP method
path: The route path
Returns:
The operation object as a dictionary
"""
# Get function signature and docstring
signature = inspect.signature(handler)
docstring = inspect.getdoc(handler) or ""
# Parse docstring to get description and parameter/return descriptions
description, param_docs, return_doc, metadata = self._parse_docstring(docstring)
# Get type hints
type_hints = get_type_hints(handler)
# Extract path parameters from the path
path_params = re.findall(r'{([^}]+)}', path)
# Generate parameters
parameters = []
request_body = None
# Add path parameters first
for param_name in path_params:
param_type = type_hints.get(param_name, str)
param_description = param_docs.get(param_name, "")
parameter = {
"name": param_name,
"in": "path",
"description": param_description,
"required": True,
"schema": self._get_schema_for_type(param_type),
}
parameters.append(parameter)
# Add other parameters
for param_name, param in signature.parameters.items():
# Skip 'self' parameter for class methods
if param_name == "self":
continue
# Skip 'request' parameter as it's handled internally
if param_name == "request":
continue
# Skip path parameters as they're already added
if param_name in path_params:
continue
# Get parameter type
param_type = type_hints.get(param_name, Any)
# Get parameter description from docstring
param_description = param_docs.get(param_name, "")
# For POST, PUT, PATCH methods, use request body for parameters
if method.upper() in ["POST", "PUT", "PATCH"] and param_name not in ["path", "query"]:
if not request_body:
request_body = {
"description": "Request body",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {},
"required": [],
}
}
}
}
# Add property to request body
schema = request_body["content"]["application/json"]["schema"]
schema["properties"][param_name] = self._get_schema_for_type(param_type)
# If parameter has no default value, it's required
if param.default == inspect.Parameter.empty:
schema["required"].append(param_name)
else:
# Add as query parameter
parameter = {
"name": param_name,
"in": "query",
"description": param_description,
"schema": self._get_schema_for_type(param_type),
}
# If parameter has no default value, it's required
if param.default == inspect.Parameter.empty and not is_optional_type(param_type):
parameter["required"] = True
# Add examples if available
if f"{param_name}_example" in metadata:
parameter["example"] = metadata[f"{param_name}_example"]
parameters.append(parameter)
# Generate responses
responses = {
"200": {
"description": return_doc or "Successful response",
"content": {
"application/json": {
"schema": self._get_schema_for_type(type_hints.get("return", Any)),
}
}
},
"400": {
"description": "Bad request",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"error": {"type": "string"},
}
}
}
}
},
"500": {
"description": "Internal server error",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"error": {"type": "string"},
}
}
}
}
}
}
# Add custom responses from metadata
if "responses" in metadata:
for status_code, response_info in metadata["responses"].items():
responses[str(status_code)] = response_info
# Build operation object
operation = {
"summary": metadata.get("summary", self._get_summary(handler.__name__, description)),
"description": description,
"parameters": parameters,
"responses": responses,
}
# Add operation ID if available
if "operationId" in metadata:
operation["operationId"] = metadata["operationId"]
else:
operation["operationId"] = handler.__name__
# Add tags if available
if "tags" in metadata:
operation["tags"] = metadata["tags"]
# Add deprecated flag if available
if "deprecated" in metadata:
operation["deprecated"] = metadata["deprecated"]
# Add external docs if available
if "externalDocs" in metadata:
operation["externalDocs"] = metadata["externalDocs"]
# Add request body if present
if request_body:
# Add examples if available
if "requestBodyExample" in metadata:
request_body["content"]["application/json"]["example"] = metadata["requestBodyExample"]
operation["requestBody"] = request_body
# Add callbacks if available
if "callbacks" in metadata:
operation["callbacks"] = metadata["callbacks"]
return operation
def _get_summary(self, function_name: str, description: str) -> str:
"""
Generate a summary for an operation.
Args:
function_name: The name of the function
description: The description from the docstring
Returns:
A summary string
"""
# If description is available, use the first line
if description:
first_line = description.split("\n")[0].strip()
if first_line:
return first_line
# Otherwise, generate a summary from the function name
words = re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z]|$)', function_name)
return " ".join(word.capitalize() for word in words)
def _parse_docstring(self, docstring: str) -> tuple:
"""
Parse a docstring to extract description and parameter/return descriptions.
Args:
docstring: The docstring to parse
Returns:
A tuple containing (description, parameter_docs, return_doc, metadata)
"""
if not docstring:
return "", {}, "", {}
# Extract OpenAPI metadata from docstring
metadata = {}
docstring_lines = docstring.split("\n")
cleaned_lines = []
i = 0
while i < len(docstring_lines):
line = docstring_lines[i].strip()
# Check for OpenAPI metadata
if line.startswith("@openapi"):
# Extract the metadata key and value
metadata_match = re.match(r'@openapi\s+(\w+):\s*(.*)', line)
if metadata_match:
key, value = metadata_match.groups()
# Handle multi-line values
if value.endswith("\\"):
value = value[:-1]
j = i + 1
while j < len(docstring_lines) and (docstring_lines[j].strip().startswith("@") or not docstring_lines[j].strip()):
value += " " + docstring_lines[j].strip()
j += 1
i = j - 1
# Try to parse as JSON if possible
try:
metadata[key] = json.loads(value)
except json.JSONDecodeError:
metadata[key] = value
else:
cleaned_lines.append(line)
i += 1
# Reconstruct docstring without metadata
cleaned_docstring = "\n".join(cleaned_lines)
# Split docstring into sections
sections = re.split(r'\n\s*(?:Args|Returns):\s*\n', cleaned_docstring)
# Extract description
description = sections[0].strip()
# Extract parameter descriptions
param_docs = {}
if len(sections) > 1 and "Args:" in cleaned_docstring:
args_section = sections[1].strip()
param_matches = re.findall(r'(\w+):\s*(.*?)(?=\n\s*\w+:|$)', args_section, re.DOTALL)
for param_name, param_desc in param_matches:
param_docs[param_name] = param_desc.strip()
# Extract return description
return_doc = ""
if "Returns:" in cleaned_docstring:
returns_section = cleaned_docstring.split("Returns:")[1].strip()
return_doc = returns_section
return description, param_docs, return_doc, metadata
def _get_schema_for_type(self, type_hint: Type) -> Dict[str, Any]:
"""
Generate a JSON Schema for a Python type.
Args:
type_hint: The Python type
Returns:
A JSON Schema as a dictionary
"""
# Handle None/NoneType
if type_hint is type(None):
return {"type": "null"}
# Handle Any
if type_hint is Any:
return {}
# Handle Optional types
if is_optional_type(type_hint):
inner_type = extract_optional_type(type_hint)
schema = self._get_schema_for_type(inner_type)
schema["nullable"] = True
return schema
# Handle Union types
if is_union_type(type_hint):
union_types = get_union_types(type_hint)
# If one of the union types is None, treat it as an optional type
if type(None) in union_types:
non_none_types = [t for t in union_types if t is not type(None)]
if len(non_none_types) == 1:
schema = self._get_schema_for_type(non_none_types[0])
schema["nullable"] = True
return schema
# Otherwise, use oneOf for the union
return {
"oneOf": [self._get_schema_for_type(t) for t in union_types if t is not type(None)]
}
# Handle List types
if is_list_type(type_hint):
item_type = get_args(type_hint)[0] if get_args(type_hint) else Any
return {
"type": "array",
"items": self._get_schema_for_type(item_type)
}
# Handle Dict types
if is_dict_type(type_hint):
args = get_args(type_hint)
if args:
key_type, value_type = args
# Only support string keys in JSON
if key_type is not str:
key_type = str
return {
"type": "object",
"additionalProperties": self._get_schema_for_type(value_type)
}
return {"type": "object"}
# Handle primitive types
if type_hint is str:
return {"type": "string"}
elif type_hint is int:
return {"type": "integer"}
elif type_hint is float:
return {"type": "number"}
elif type_hint is bool:
return {"type": "boolean"}
# Handle custom classes (as objects)
if hasattr(type_hint, "__annotations__"):
# Check if we have a registered schema for this type
type_name = getattr(type_hint, "__name__", str(type_hint))
if type_name in self.custom_schemas:
return {"$ref": f"#/components/schemas/{type_name}"}
properties = {}
required = []
for attr_name, attr_type in type_hint.__annotations__.items():
properties[attr_name] = self._get_schema_for_type(attr_type)
# Assume all attributes are required for now
required.append(attr_name)
# Register the schema for future reference
self.custom_schemas[type_name] = {
"type": "object",
"properties": properties,
"required": required
}
return {"$ref": f"#/components/schemas/{type_name}"}
# Default to object for unknown types
return {"type": "object"}
def _generate_components(self) -> Dict[str, Any]:
"""
Generate the components section of the OpenAPI documentation.
Returns:
The components section as a dictionary
"""
components = {}
# Add schemas
if self.custom_schemas:
components["schemas"] = self.custom_schemas
# Add responses
if self.custom_responses:
components["responses"] = self.custom_responses
# Add parameters
if self.custom_parameters:
components["parameters"] = self.custom_parameters
# Add examples
if self.custom_examples:
components["examples"] = self.custom_examples
# Add request bodies
if self.custom_request_bodies:
components["requestBodies"] = self.custom_request_bodies
# Add headers
if self.custom_headers:
components["headers"] = self.custom_headers
# Add security schemes
security_schemes = self._generate_security_schemes()
if security_schemes:
components["securitySchemes"] = security_schemes
# Add links
if self.custom_links:
components["links"] = self.custom_links
# Add callbacks
if self.custom_callbacks:
components["callbacks"] = self.custom_callbacks
# Merge with pre-defined components
if self.config.components:
for component_type, component_values in self.config.components.items():
if component_type not in components:
components[component_type] = {}
components[component_type].update(component_values)
return components
def _generate_security_schemes(self) -> Dict[str, Any]:
"""
Generate the security schemes section of the OpenAPI documentation.
Returns:
The security schemes section as a dictionary
"""
security_schemes = {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
},
"apiKeyAuth": {
"type": "apiKey",
"in": "header",
"name": "X-API-Key"
},
"basicAuth": {
"type": "http",
"scheme": "basic"
},
"oauth2Auth": {
"type": "oauth2",
"flows": {
"implicit": {
"authorizationUrl": "https://example.com/oauth/authorize",
"scopes": {
"read": "Read access",
"write": "Write access"
}
}
}
}
}
# Merge with custom security schemes
security_schemes.update(self.custom_security_schemes)
return security_schemes
def _get_security_requirements(self, handler) -> List[Dict[str, List[str]]]:
"""
Get security requirements for a handler.
Args:
handler: The handler function
Returns:
A list of security requirement objects
"""
# Check if handler has security decorators
security_requirements = []
# Unwrap the handler to get the original function
original_handler = handler
while hasattr(original_handler, "__wrapped__"):
original_handler = original_handler.__wrapped__
# Check for JWT authentication
if hasattr(original_handler, "_jwt_required"):
security_requirements.append({"bearerAuth": []})
# Check for API key authentication
if hasattr(original_handler, "_api_key_required"):
scopes = getattr(original_handler, "_api_key_scopes", [])
security_requirements.append({"apiKeyAuth": scopes})
# Check for Basic authentication
if hasattr(original_handler, "_basic_auth_required"):
security_requirements.append({"basicAuth": []})
# Check for OAuth2 authentication
if hasattr(original_handler, "_oauth2_required"):
scopes = getattr(original_handler, "_oauth2_scopes", [])
security_requirements.append({"oauth2Auth": scopes})
return security_requirements
def register_schema(self, name: str, schema: Dict[str, Any]) -> None:
"""
Register a custom schema.
Args:
name: The name of the schema
schema: The schema definition
"""
self.custom_schemas[name] = schema
def register_response(self, name: str, response: Dict[str, Any]) -> None:
"""
Register a custom response.
Args:
name: The name of the response
response: The response definition
"""
self.custom_responses[name] = response
def register_parameter(self, name: str, parameter: Dict[str, Any]) -> None:
"""
Register a custom parameter.
Args:
name: The name of the parameter
parameter: The parameter definition
"""
self.custom_parameters[name] = parameter
def register_example(self, name: str, example: Dict[str, Any]) -> None:
"""
Register a custom example.
Args:
name: The name of the example
example: The example definition
"""
self.custom_examples[name] = example
def register_request_body(self, name: str, request_body: Dict[str, Any]) -> None:
"""
Register a custom request body.
Args:
name: The name of the request body
request_body: The request body definition
"""
self.custom_request_bodies[name] = request_body
def register_header(self, name: str, header: Dict[str, Any]) -> None:
"""
Register a custom header.
Args:
name: The name of the header
header: The header definition
"""
self.custom_headers[name] = header
def register_security_scheme(self, name: str, security_scheme: Dict[str, Any]) -> None:
"""
Register a custom security scheme.
Args:
name: The name of the security scheme
security_scheme: The security scheme definition
"""
self.custom_security_schemes[name] = security_scheme
def register_link(self, name: str, link: Dict[str, Any]) -> None:
"""
Register a custom link.
Args:
name: The name of the link
link: The link definition
"""
self.custom_links[name] = link
def register_callback(self, name: str, callback: Dict[str, Any]) -> None:
"""
Register a custom callback.
Args:
name: The name of the callback
callback: The callback definition
"""
self.custom_callbacks[name] = callback
def to_json(self, indent: int = 2) -> str:
"""
Convert the OpenAPI documentation to JSON.
Args:
indent: The indentation level for the JSON output
Returns:
The OpenAPI documentation as a JSON string
"""
return json.dumps(self.generate(), indent=indent)
def to_yaml(self) -> str:
"""
Convert the OpenAPI documentation to YAML.
Returns:
The OpenAPI documentation as a YAML string
"""
try:
import yaml
return yaml.dump(self.generate(), sort_keys=False)
except ImportError:
raise ImportError("PyYAML is required for YAML output. Install it with 'pip install pyyaml'.")