Skip to main content

Complete Guide

Overview

Ajna Analytical Engine is a high-performance Python library for executing complex analytical queries across multiple databases. Built on Polars for speed and featuring comprehensive query capabilities.

Installation & Setup

Installation

pip install ajna-analytical-engine

Required Configuration

The engine requires a datasource configuration file (YAML) that defines your database connections and table schemas:
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine

# Datasource config is mandatory
engine = AnalyticalEngine(
    datasource_config_path=Path("my_datasources.yaml")
)

Database Support

Production-Ready Databases

  • PostgreSQL - Full support with connection pooling
  • MySQL - Full support with connection pooling

Enterprise Databases (via Connection API)

  • AWS Athena - IAM roles, access keys, workgroups
  • Google BigQuery - Service accounts, projects, datasets
  • Snowflake - Username/password, key-pair authentication
  • Azure Synapse - Managed identity, service principals
  • Amazon Redshift - IAM authentication
  • SQLite - File-based databases

Authentication Methods

Database Authentication

# Username/Password
sources:
  my_postgres:
    type: postgres
    host: localhost
    port: 5432
    database: mydb
    username: user
    password: pass

# Connection String
sources:
  my_db:
    type: postgres
    uri: postgresql://user:pass@host:5432/db

Cloud Authentication

# AWS IAM Role
sources:
  athena_data:
    type: athena
    region: us-east-1
    role_arn: arn:aws:iam::123:role/AthenaRole
    s3_staging_dir: s3://bucket/staging/

# GCP Service Account
sources:
  bigquery_data:
    type: bigquery
    project_id: my-project
    service_account_key: /path/to/key.json

# Snowflake Key-Pair
sources:
  snowflake_data:
    type: snowflake
    account: my-account
    user: my-user
    private_key: /path/to/private_key.pem

Enterprise Connection Management

# Connection ID resolution via API
sources:
  prod_cluster:
    connection_id: "analytics-cluster-prod"

Query Types

1. Basic SELECT Queries

from ajna_analytical_engine import QueryRequest

query = QueryRequest(
    sources=["users"],
    select=["id", "name", "email"],
    filters={"users": [{"column": "active", "op": "=", "value": True}]},
    order_by=[{"column": "name", "direction": "asc"}],
    limit=100
)

2. JOIN Operations

All join types supported: inner, left, right, full, cross
query = QueryRequest(
    sources=["users", "orders"],
    joins=[{
        "left": "users.id",
        "right": "orders.customer_id", 
        "type": "inner"
    }],
    select=["users.name", "orders.total"]
)

3. Aggregations

Supported functions: sum, avg, count, min, max, count_distinct, stddev, variance, median, percentile, mode
query = QueryRequest(
    sources=["sales"],
    aggregations=[
        {"function": "sum", "column": "amount", "alias": "total_sales"},
        {"function": "count", "column": "id", "alias": "order_count"},
        {"function": "avg", "column": "amount", "alias": "avg_order"}
    ],
    group_by=["region", "product_category"]
)

4. Window Functions

Full window function support: row_number, rank, dense_rank, lag, lead, first_value, last_value, ntile
query = QueryRequest(
    sources=["sales"],
    window_functions=[{
        "function": "row_number",
        "alias": "rank",
        "partition_by": ["region"], 
        "order_by": [{"column": "amount", "direction": "desc"}]
    }],
    select=["*"]
)

5. Subqueries

Supported types: scalar, in, not_in, exists, not_exists, derived_table, any, all
query = QueryRequest(
    sources=["customers"],
    filters={"customers": [{
        "column": "id",
        "op": "in",
        "value": {
            "type": "subquery",
            "subquery_type": "in",
            "sources": ["high_value_orders"],
            "select": ["customer_id"],
            "filters": {"high_value_orders": [{"column": "amount", "op": ">", "value": 1000}]}
        }
    }]}
)

6. Case Expressions

query = QueryRequest(
    sources=["users"],
    case_expressions=[{
        "when_conditions": [
            {"condition": {"column": "age", "op": "<", "value": 18}, "value": "Minor"},
            {"condition": {"column": "age", "op": ">=", "value": 65}, "value": "Senior"}
        ],
        "else_value": "Adult",
        "alias": "age_group"
    }]
)

7. String Functions

Supported: concat, substring, upper, lower, trim, length, replace, split
query = QueryRequest(
    sources=["users"],
    string_functions=[{
        "function": "concat",
        "arguments": ["first_name", " ", "last_name"],
        "alias": "full_name"
    }]
)

8. Mathematical Functions

Supported: abs, round, ceil, floor, power, sqrt, mod, sin, cos, tan, greatest, least
query = QueryRequest(
    sources=["measurements"],
    math_functions=[{
        "function": "round",
        "arguments": ["temperature", 2],
        "alias": "temp_rounded"
    }]
)

9. Date/Time Functions

Supported: extract, year, month, day, date_add, date_sub, datediff, now, current_date
query = QueryRequest(
    sources=["events"],
    datetime_functions=[{
        "function": "extract",
        "arguments": ["created_at"],
        "extract_part": "month",
        "alias": "event_month"
    }]
)

10. NULL Handling Functions

Supported: isnull, coalesce, nullif, ifnull
query = QueryRequest(
    sources=["users"],
    null_functions=[{
        "function": "coalesce",
        "arguments": ["middle_name", "''"],
        "alias": "middle_name_clean"
    }]
)

11. Common Table Expressions (CTEs)

query = QueryRequest(
    cte={
        "high_value_customers": {
            "sources": ["customers", "orders"],
            "joins": [{"left": "customers.id", "right": "orders.customer_id", "type": "inner"}],
            "aggregations": [{"function": "sum", "column": "orders.amount", "alias": "total_spent"}],
            "group_by": ["customers.id"],
            "filters": {"orders": [{"column": "amount", "op": ">", "value": 1000}]}
        }
    },
    sources=["high_value_customers"],
    select=["*"]
)

SQL Conversion

Convert existing SQL queries to native format:
from ajna_analytical_engine import SQLToQueryRequest

sql_request = SQLToQueryRequest(
    sql="""
    SELECT 
        c.name,
        COUNT(o.id) as order_count,
        SUM(o.amount) as total_spent
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id
    WHERE c.active = true
    GROUP BY c.name
    HAVING SUM(o.amount) > 1000
    ORDER BY total_spent DESC
    LIMIT 100
    """,
    sql_name="top_customers"
)

native_query = engine.convert_sql_to_query(sql_request)

Configuration

Datasource Configuration

sources:
  users:
    type: postgres
    host: localhost
    port: 5432
    database: myapp
    username: app_user
    password: secure_pass
    
  sales_data:
    type: mysql
    host: mysql-server
    port: 3306 
    database: analytics
    username: readonly_user
    password: readonly_pass

tables:
  users:
    columns:
      id: {type: integer, primary_key: true}
      name: {type: string, nullable: false}
      email: {type: string, nullable: true}
      active: {type: boolean, nullable: false}
      created_at: {type: timestamp, nullable: false}
    relationships:
      - table: orders
        foreign_key: customer_id
        type: one-to-many
        
  sales_data:
    columns:
      id: {type: integer, primary_key: true}
      customer_id: {type: integer, nullable: false}
      amount: {type: float, nullable: false}
      sale_date: {type: date, nullable: false}
      region: {type: string, nullable: true}

Engine Configuration (Optional)

# Custom engine config - my_engine.yaml
query:
  max_limit: 50000
  default_limit: 1000
  query_timeout_seconds: 300

performance:
  max_memory_mb: 8192
  enable_parallel_execution: true
  max_parallel_workers: 8

cache:
  enabled: true
  size: 2000
  ttl_seconds: 1800

logging:
  level: "INFO"
  enable_query_logging: true
  enable_performance_metrics: true
# Use custom engine config
engine = AnalyticalEngine(
    engine_config_path=Path("my_engine.yaml"),
    datasource_config_path=Path("my_datasources.yaml")
)

Performance & Caching

Query Result Caching

Results are automatically cached based on query signature:
# First execution - queries database
result1 = engine.execute_query(query)

# Second execution - returns cached result  
result2 = engine.execute_query(query)  # Fast!

# Clear cache when needed
engine.clear_cache()

Performance Monitoring

result = engine.execute_query(query)

# Access performance metrics
print(f"Execution time: {result.metadata.execution_time_seconds}s")
print(f"Memory usage: {result.metadata.memory_usage_mb}MB")
print(f"Rows processed: {result.metadata.rows_processed}")
print(f"Cache hit: {result.metadata.cache_hit}")

Error Handling

The engine provides detailed error information:
from ajna_analytical_engine import (
    QueryValidationError, 
    DataLoadingError,
    ConfigurationError
)

try:
    result = engine.execute_query(query)
except QueryValidationError as e:
    print(f"Invalid query: {e}")
except DataLoadingError as e:
    print(f"Data loading failed: {e}")
except ConfigurationError as e:
    print(f"Configuration error: {e}")

Best Practices

  1. Always provide datasource configuration - it’s mandatory
  2. Use query limits to prevent accidentally large result sets
  3. Enable caching for frequently executed queries
  4. Monitor performance with built-in metrics
  5. Use connection pooling for production databases
  6. Validate queries during development using engine.validate_query()
  7. Use environment variables for credentials in production

Advanced Features

Health Monitoring

from ajna_analytical_engine._analytics._health import HealthChecker

health = HealthChecker(engine_config=engine._engine_config)
status = health.check_system_health()
print(f"System healthy: {status['healthy']}")

Custom Connection Managers

from ajna_analytical_engine.config import ConnectionManager

# Create custom connection manager
conn_manager = ConnectionManager.create_with_rest_provider(
    api_endpoint="https://api.mycompany.com/connections",
    api_token="your-api-token"
)

engine = AnalyticalEngine(
    datasource_config_path=Path("config.yaml"),
    config_manager=custom_config_manager
)
This guide covers all the features actually implemented in the Ajna Analytical Engine. For specific API details, see the API Reference.