Skip to main content

API Reference

Complete reference for all classes, methods, and configuration options.

Core Classes

AnalyticalEngine

Main interface for the Ajna Analytical Engine.
class AnalyticalEngine:
    def __init__(
        self,
        engine_config_path: Optional[Path] = None,
        datasource_config_path: Optional[Path] = None,
        config_manager: Optional[ConfigManager] = None,
    )
Parameters:
  • engine_config_path (Optional[Path]): Path to custom engine configuration YAML
  • datasource_config_path (Optional[Path]): MANDATORY - Path to datasource configuration YAML
  • config_manager (Optional[ConfigManager]): Pre-configured ConfigManager instance
Raises:
  • ConfigurationError: If datasource configuration is not provided (mandatory)

Methods

execute_query()
def execute_query(self, request: QueryRequest) -> QueryResult
Execute an analytical query with caching and performance monitoring. Parameters:
  • request (QueryRequest): Query specification
Returns:
  • QueryResult: Query results with data and metadata
Example:
query = QueryRequest(
    sources=["users"],
    select=["name", "email"],
    limit=100
)
result = engine.execute_query(query)
convert_sql_to_query()
def convert_sql_to_query(self, request: SQLToQueryRequest) -> Dict[str, Any]
Convert SQL query to native QueryRequest format. Parameters:
  • request (SQLToQueryRequest): SQL conversion request
Returns:
  • Dict[str, Any]: Native query specification as JSON
validate_query()
def validate_query(
    self, 
    request: QueryRequest, 
    raise_on_error: bool = True
) -> Dict[str, Any]
Validate a query without executing it. Parameters:
  • request (QueryRequest): Query to validate
  • raise_on_error (bool): Whether to raise exception on validation errors
Returns:
  • Dict[str, Any]: Validation results and suggestions
clear_cache()
def clear_cache(self) -> None
Clear the query result cache.
analyze()
def analyze(self, request: QueryRequest) -> QueryResult
Deprecated: Use execute_query() instead. Provided for backward compatibility.

Data Models

QueryRequest

Type-safe query specification with comprehensive validation.
class QueryRequest(BaseModel):
    # Required
    sources: List[str]
    
    # Optional
    table_aliases: Optional[Dict[str, str]] = None
    select: Optional[List[str]] = None
    joins: Optional[List[Union[Dict[str, str], Join]]] = None
    filters: Optional[Dict[str, List[Union[Dict[str, Any], Filter]]]] = None
    group_by: Optional[List[str]] = None
    aggregations: Optional[List[Dict[str, Any]]] = None
    window_functions: Optional[List[Union[Dict[str, Any], WindowFunction]]] = None
    case_expressions: Optional[List[Union[Dict[str, Any], CaseExpression]]] = None
    null_functions: Optional[List[Union[Dict[str, Any], NullFunction]]] = None
    string_functions: Optional[List[Union[Dict[str, Any], StringFunction]]] = None
    math_functions: Optional[List[Union[Dict[str, Any], MathFunction]]] = None
    datetime_functions: Optional[List[Union[Dict[str, Any], DateTimeFunction]]] = None
    type_conversion_functions: Optional[List[Union[Dict[str, Any], TypeConversionFunction]]] = None
    subqueries: Optional[List[Union[Dict[str, Any], Subquery]]] = None
    order_by: Optional[List[Union[str, Dict[str, str]]]] = None
    limit: Optional[int] = None
    offset: Optional[int] = None
    name: Optional[str] = None
    description: Optional[str] = None
    options: Optional[Dict[str, Any]] = None
    cte: Optional[Dict[str, Any]] = None
    union: Optional[List[Dict[str, Any]]] = None
    intersect: Optional[List[Dict[str, Any]]] = None
    except_: Optional[List[Dict[str, Any]]] = None

Filter Operations

class Filter(BaseModel):
    column: str
    op: str  # =, !=, >, <, >=, <=, like, not like, in, not in, is null, is not null, between
    value: Any  # Can be literal value or Subquery object

Join Operations

class Join(BaseModel):
    left: str
    right: str
    type: str = "inner"  # inner, left, right, full, cross

Window Functions

class WindowFunction(BaseModel):
    function: str  # row_number, rank, dense_rank, lag, lead, first_value, last_value, ntile, sum, avg, count, min, max
    column: Optional[str] = None
    alias: str
    partition_by: List[str] = []
    order_by: List[Dict[str, str]] = []
    frame: Optional[Dict[str, Any]] = None

Aggregations

Supported functions: sum, avg, count, min, max, count_distinct, stddev, variance, median, percentile, mode
{
    "function": "sum",
    "column": "amount", 
    "alias": "total_amount"
}

String Functions

class StringFunction(BaseModel):
    function: str  # concat, substring, upper, lower, trim, length, replace, split
    arguments: List[Union[str, Any]]
    alias: str

Mathematical Functions

class MathFunction(BaseModel):
    function: str  # abs, round, ceil, floor, power, sqrt, mod, sin, cos, tan, greatest, least
    arguments: List[Union[str, int, float]]
    alias: str

Date/Time Functions

class DateTimeFunction(BaseModel):
    function: str  # extract, year, month, day, date_add, date_sub, datediff, now, current_date, current_timestamp
    arguments: List[Union[str, int]]
    alias: str
    extract_part: Optional[str] = None  # year, month, day, hour, minute, second

NULL Handling Functions

class NullFunction(BaseModel):
    function: str  # isnull, coalesce, nullif, ifnull
    arguments: List[Union[str, Any]]
    alias: str

Case Expressions

class CaseExpression(BaseModel):
    type: str = "case"
    when_conditions: List[Dict[str, Any]] = []
    else_value: Optional[Dict[str, Any]] = None
    alias: Optional[str] = None

Subqueries

class Subquery(BaseModel):
    subquery_type: str  # scalar, in, not_in, exists, not_exists, derived_table, any, all
    sources: List[str]
    select: Optional[List[str]] = None
    joins: Optional[List[Union[Dict[str, str], Join]]] = None
    filters: Optional[Dict[str, List[Union[Dict[str, Any], Filter]]]] = None
    # ... other query fields

SQLToQueryRequest

SQL-to-native query conversion request.
class SQLToQueryRequest(BaseModel):
    sql: str
    cache_sql: bool = False
    sql_name: Optional[str] = None

QueryResult

Comprehensive query results with metadata.
class QueryResult(BaseModel):
    data: pl.DataFrame  # Query results as Polars DataFrame
    metadata: QueryMetadata  # Execution metadata
    
class QueryMetadata(BaseModel):
    query_id: str
    execution_time_seconds: float
    row_count: int
    column_count: int
    memory_usage_mb: float
    cache_hit: bool
    query_type: str
    data_sources_accessed: List[str]
    rows_processed: int
    performance_metrics: PerformanceMetrics

class PerformanceMetrics(BaseModel):
    cpu_usage_percent: float
    memory_peak_mb: float
    io_operations: int
    network_calls: int
    cache_hits: int
    cache_misses: int

Configuration Management

ConfigManager

Manages YAML-based configuration loading.
class ConfigManager:
    def load_engine_config(self) -> Dict[str, Any]
    def load_datasource_config(self) -> Dict[str, Any]
    def get_engine_setting(self, key: str) -> Any
    def get_datasource_setting(self, key: str) -> Any

Connection Management

ConnectionManager

Handles database connections with credential resolution and caching.
class ConnectionManager:
    def __init__(
        self,
        connection_provider: Optional[ConnectionProvider] = None,
        cache_ttl: int = 300,
        enable_cache: bool = True
    )
    
    async def get_connection_string(
        self, 
        source_config: Dict[str, Any],
        source_name: str = "unknown"
    ) -> str
    
    async def test_connection(
        self, 
        source_config: Dict[str, Any],
        source_name: str = "unknown"
    ) -> bool

Connection Schemas

Database-specific connection schemas:
class PostgresConnectionSchema(BaseModel):
    connection_type: ConnectionType = ConnectionType.POSTGRES
    host: str
    port: int = 5432
    database: str
    username: Optional[str] = None
    password: Optional[str] = None
    sslmode: Optional[str] = None

class AthenaConnectionSchema(BaseModel):
    connection_type: ConnectionType = ConnectionType.ATHENA
    region: str
    role_arn: Optional[str] = None
    access_key_id: Optional[str] = None
    secret_access_key: Optional[str] = None
    workgroup: str = "primary"
    s3_staging_dir: str
    database: str = "default"

class SnowflakeConnectionSchema(BaseModel):
    connection_type: ConnectionType = ConnectionType.SNOWFLAKE
    account: str
    user: str
    warehouse: Optional[str] = None
    database: Optional[str] = None
    schema: Optional[str] = None
    password: Optional[str] = None
    private_key: Optional[str] = None

Enumerations

FilterOperator

class FilterOperator(str, Enum):
    EQUALS = "="
    NOT_EQUALS = "!="
    GREATER_THAN = ">"
    LESS_THAN = "<"
    GREATER_THAN_EQUALS = ">="
    LESS_THAN_EQUALS = "<="
    LIKE = "like"
    NOT_LIKE = "not like"
    IN = "in"
    NOT_IN = "not in"
    IS_NULL = "is null"
    IS_NOT_NULL = "is not null"
    BETWEEN = "between"

JoinType

class JoinType(str, Enum):
    INNER = "inner"
    LEFT = "left"
    RIGHT = "right"
    FULL = "full"
    CROSS = "cross"

AggregationType

class AggregationType(str, Enum):
    SUM = "sum"
    AVG = "avg"
    COUNT = "count"
    MIN = "min"
    MAX = "max"
    COUNT_DISTINCT = "count_distinct"
    STDDEV = "stddev"
    VARIANCE = "variance"
    MEDIAN = "median"
    PERCENTILE = "percentile"
    MODE = "mode"

ConnectionType

class ConnectionType(Enum):
    POSTGRES = "postgres"
    MYSQL = "mysql"
    ATHENA = "athena"
    BIGQUERY = "bigquery"
    SNOWFLAKE = "snowflake"
    SYNAPSE = "synapse"
    REDSHIFT = "redshift"
    SQLITE = "sqlite"

AuthenticationType

class AuthenticationType(Enum):
    USERNAME_PASSWORD = "username_password"
    CONNECTION_STRING = "connection_string"
    IAM_ROLE = "iam_role"
    ACCESS_KEY = "access_key"
    SERVICE_ACCOUNT = "service_account"
    MANAGED_IDENTITY = "managed_identity"
    KEY_PAIR = "key_pair"
    OAUTH = "oauth"
    JWT = "jwt"
    API_KEY = "api_key"

Exceptions

Core Exceptions

from ajna_analytical_engine import (
    AjnaEngineError,           # Base exception
    QueryValidationError,      # Invalid query specification
    DataLoadingError,          # Data source loading errors
    ConfigurationError,        # Configuration errors
    QueryExecutionError,       # Query execution failures
    CacheError,               # Caching errors
    SQLTranslationError,      # SQL conversion errors
    PerformanceError,         # Performance-related errors
    AuthenticationError       # Authentication failures
)

Utility Functions

Health Checking

from ajna_analytical_engine._analytics._health import HealthChecker

health = HealthChecker(engine_config=config)
status = health.check_system_health()

Performance Monitoring

from ajna_analytical_engine._analytics._performance import PerformanceMonitor

monitor = PerformanceMonitor()
metrics = monitor.collect_metrics()

Complete Example

from pathlib import Path
from ajna_analytical_engine import (
    AnalyticalEngine, 
    QueryRequest, 
    SQLToQueryRequest,
    QueryValidationError
)

# Initialize engine with datasource config (mandatory)
engine = AnalyticalEngine(
    datasource_config_path=Path("datasources.yaml")
)

# Complex query with multiple features
query = QueryRequest(
    sources=["customers", "orders", "products"],
    joins=[
        {"left": "customers.id", "right": "orders.customer_id", "type": "inner"},
        {"left": "orders.product_id", "right": "products.id", "type": "inner"}
    ],
    aggregations=[
        {"function": "sum", "column": "orders.amount", "alias": "total_spent"},
        {"function": "count", "column": "orders.id", "alias": "order_count"}
    ],
    window_functions=[{
        "function": "rank",
        "alias": "customer_rank",
        "partition_by": ["products.category"],
        "order_by": [{"column": "total_spent", "direction": "desc"}]
    }],
    filters={
        "customers": [{"column": "active", "op": "=", "value": True}],
        "orders": [{"column": "order_date", "op": ">=", "value": "2023-01-01"}]
    },
    group_by=["customers.id", "customers.name", "products.category"],
    order_by=[{"column": "total_spent", "direction": "desc"}],
    limit=100
)

try:
    # Validate query
    validation = engine.validate_query(query)
    print("Query valid:", validation["valid"])
    
    # Execute query
    result = engine.execute_query(query)
    
    # Access results
    print(f"Found {result.metadata.row_count} customers")
    print(f"Execution time: {result.metadata.execution_time_seconds:.2f}s")
    print(f"Memory used: {result.metadata.memory_usage_mb:.1f}MB")
    print(f"Cache hit: {result.metadata.cache_hit}")
    
    # Convert DataFrame to other formats
    pandas_df = result.data.to_pandas()
    json_data = result.data.to_dicts()
    
except QueryValidationError as e:
    print(f"Query validation failed: {e}")
except Exception as e:
    print(f"Query execution failed: {e}")
This API reference covers all classes, methods, and functionality actually implemented in the Ajna Analytical Engine codebase.