Skip to main content

Configuration Guide

Get the Ajna Analytical Engine running with complete, copy-paste ready configuration files. Every example below is a complete working configuration.

Quick Start - Copy These Files

Scenario 1: Local Development (PostgreSQL)

File: dev_datasources.yaml
# Complete datasource configuration for local PostgreSQL development
sources:
  users:
    type: postgres
    host: localhost
    port: 5432
    database: myapp_dev
    username: dev_user
    password: dev_password
    
  orders:
    type: postgres
    host: localhost
    port: 5432
    database: myapp_dev
    username: dev_user
    password: dev_password

tables:
  users:
    columns:
      id: {type: integer, primary_key: true, nullable: false}
      name: {type: string, nullable: false}
      email: {type: string, nullable: true}
      age: {type: integer, nullable: true}
      created_at: {type: timestamp, nullable: false}
      is_active: {type: boolean, nullable: false}
    relationships:
      - table: orders
        foreign_key: customer_id
        type: one-to-many
        
  orders:
    columns:
      id: {type: integer, primary_key: true, nullable: false}
      customer_id: {type: integer, nullable: false}
      product_name: {type: string, nullable: false}
      amount: {type: float, nullable: false}
      order_date: {type: date, nullable: false}
      status: {type: string, nullable: false}
    relationships:
      - table: users
        foreign_key: customer_id
        type: many-to-one
File: dev_engine.yaml (optional)
# Development engine configuration - optimized for fast feedback
query:
  default_limit: 100
  max_limit: 10000
  query_timeout_seconds: 60
  enable_window_functions: true
  enable_complex_aggregations: true
  enable_subqueries: true
  enable_cte: true

performance:
  max_memory_mb: 2048
  max_parallel_workers: 2
  enable_parallel_execution: true

cache:
  enabled: true
  size: 100
  ttl_seconds: 300

logging:
  level: "DEBUG"
  enable_query_logging: true
  enable_performance_metrics: true
  log_to_console: true
  log_to_file: false
Usage:
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, QueryRequest

# Initialize engine
engine = AnalyticalEngine(
    engine_config_path=Path("dev_engine.yaml"),  # Optional
    datasource_config_path=Path("dev_datasources.yaml")  # Required
)

# Method 1: Native QueryRequest
result = engine.execute_query(QueryRequest(
    sources=["users"],
    select=["name", "email"],
    limit=5
))
print(f"Found {result.metadata.row_count} users")

# Method 2: SQL Query with Conversion
from ajna_analytical_engine import SQLToQueryRequest

sql_request = SQLToQueryRequest(
    sql="SELECT name, email FROM users WHERE is_active = true LIMIT 5",
    sql_name="active_users"
)

# Convert SQL to native format
native_query = engine.convert_sql_to_query(sql_request)
print("Converted query:", native_query)

# Execute the converted query
result = engine.execute_query(QueryRequest(**native_query))
print(f"Found {result.metadata.row_count} active users")

# Method 3: Complex SQL with Joins
complex_sql = SQLToQueryRequest(
    sql="""
    SELECT 
        u.name,
        u.email,
        COUNT(o.id) as order_count,
        SUM(o.amount) as total_spent
    FROM users u
    LEFT JOIN orders o ON u.id = o.customer_id
    WHERE u.is_active = true
    GROUP BY u.id, u.name, u.email
    ORDER BY total_spent DESC
    LIMIT 10
    """,
    sql_name="top_customers"
)

# Convert and execute
native_query = engine.convert_sql_to_query(complex_sql)
result = engine.execute_query(QueryRequest(**native_query))
print(f"Top customers query returned {result.metadata.row_count} rows")

Scenario 2: Production with Environment Variables

File: prod_datasources.yaml
# Production datasource configuration with secure environment variables
sources:
  # PostgreSQL - Main application database
  users:
    type: postgres
    uri: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
    
  # MySQL - Legacy order system
  orders:
    type: mysql
    uri: mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DB}
    
  # PostgreSQL - Analytics warehouse
  analytics:
    type: postgres
    host: ${ANALYTICS_DB_HOST}
    port: ${ANALYTICS_DB_PORT}
    database: ${ANALYTICS_DB_NAME}
    username: ${ANALYTICS_DB_USER}
    password: ${ANALYTICS_DB_PASSWORD}
    sslmode: require
    application_name: ajna-analytics-prod

tables:
  users:
    columns:
      id: {type: integer, primary_key: true}
      name: {type: string}
      email: {type: string}
      created_at: {type: timestamp}
      is_active: {type: boolean}
      
  orders:
    columns:
      id: {type: integer, primary_key: true}
      customer_id: {type: integer}
      amount: {type: float}
      order_date: {type: date}
      status: {type: string}
      
  analytics:
    columns:
      user_id: {type: integer}
      event_name: {type: string}
      event_date: {type: date}
      properties: {type: string}
File: prod_engine.yaml
# Production engine configuration - optimized for performance and monitoring
query:
  default_limit: 1000
  max_limit: 1000000
  query_timeout_seconds: 600
  max_join_tables: 20
  max_filter_conditions: 200
  enable_window_functions: true
  enable_complex_aggregations: true
  enable_subqueries: true
  enable_cte: true
  enable_query_optimization: true

performance:
  max_memory_mb: 16384
  memory_warning_threshold_percent: 85.0
  max_cpu_percent: 80
  enable_parallel_execution: true
  max_parallel_workers: 16
  slow_query_warning_seconds: 2.0
  slow_query_critical_seconds: 10.0

cache:
  enabled: true
  size: 5000
  ttl_seconds: 3600
  eviction_policy: "lru"
  warning_hit_rate_percent: 15.0

logging:
  level: "INFO"
  enable_query_logging: true
  enable_performance_metrics: true
  log_to_console: false
  log_to_file: true
  log_file_path: "/var/log/ajna/prod-engine.log"

health:
  check_interval_seconds: 30
  enable_dependency_checks: true
  enable_resource_checks: true
  enable_functional_checks: true

tracing:
  enabled: true
  endpoint: "http://jaeger-collector:14268/api/traces"
  service_name: "ajna-analytics-prod"
  sample_rate: 0.1
Environment file: .env
# PostgreSQL - Main database
POSTGRES_USER=prod_app_user
POSTGRES_PASSWORD=secure_prod_password_123
POSTGRES_HOST=prod-postgres.company.com
POSTGRES_PORT=5432
POSTGRES_DB=production_app

# MySQL - Legacy system
MYSQL_USER=analytics_readonly
MYSQL_PASSWORD=mysql_analytics_pass_456
MYSQL_HOST=legacy-mysql.company.com
MYSQL_PORT=3306
MYSQL_DB=legacy_orders

# Analytics warehouse
ANALYTICS_DB_HOST=warehouse.company.com
ANALYTICS_DB_PORT=5432
ANALYTICS_DB_NAME=analytics_warehouse
ANALYTICS_DB_USER=warehouse_reader
ANALYTICS_DB_PASSWORD=warehouse_secure_pass_789
Usage Examples:
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, QueryRequest, SQLToQueryRequest

# Initialize production engine
engine = AnalyticalEngine(
    engine_config_path=Path("prod_engine.yaml"),
    datasource_config_path=Path("prod_datasources.yaml")
)

# Production SQL Analytics Examples

# 1. Cross-database analytics (PostgreSQL + MySQL)
cross_db_sql = SQLToQueryRequest(
    sql="""
    SELECT 
        u.name as customer_name,
        u.email,
        COUNT(o.id) as total_orders,
        SUM(o.amount) as lifetime_value,
        AVG(o.amount) as avg_order_value,
        MAX(o.order_date) as last_order_date
    FROM users u
    INNER JOIN orders o ON u.id = o.customer_id
    WHERE u.is_active = true 
      AND o.order_date >= '2024-01-01'
    GROUP BY u.id, u.name, u.email
    HAVING COUNT(o.id) >= 5
    ORDER BY lifetime_value DESC
    LIMIT 100
    """,
    sql_name="high_value_customers_2024"
)

result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(cross_db_sql)))
print(f"Found {result.metadata.row_count} high-value customers")

# 2. Analytics warehouse reporting
warehouse_sql = SQLToQueryRequest(
    sql="""
    SELECT 
        DATE_TRUNC('month', event_date) as month,
        event_name,
        COUNT(*) as event_count,
        COUNT(DISTINCT user_id) as unique_users
    FROM analytics
    WHERE event_date >= CURRENT_DATE - INTERVAL '12 months'
    GROUP BY DATE_TRUNC('month', event_date), event_name
    ORDER BY month DESC, event_count DESC
    """,
    sql_name="monthly_event_analytics"
)

result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(warehouse_sql)))
print(f"Monthly analytics: {result.metadata.row_count} data points")

# 3. Performance monitoring query
perf_sql = SQLToQueryRequest(
    sql="""
    WITH order_stats AS (
        SELECT 
            customer_id,
            COUNT(*) as order_count,
            SUM(amount) as total_spent,
            AVG(amount) as avg_order
        FROM orders
        WHERE order_date >= CURRENT_DATE - INTERVAL '90 days'
        GROUP BY customer_id
    )
    SELECT 
        CASE 
            WHEN total_spent >= 10000 THEN 'VIP'
            WHEN total_spent >= 5000 THEN 'Premium'
            WHEN total_spent >= 1000 THEN 'Standard'
            ELSE 'Basic'
        END as customer_tier,
        COUNT(*) as customer_count,
        AVG(total_spent) as avg_spent_per_tier,
        SUM(total_spent) as tier_total_revenue
    FROM order_stats
    GROUP BY 1
    ORDER BY tier_total_revenue DESC
    """,
    sql_name="customer_segmentation_analysis"
)

# Show the converted native query format
native_query = engine.convert_sql_to_query(perf_sql)
print("Native query structure:", native_query)

result = engine.execute_query(QueryRequest(**native_query))
print(f"Customer segmentation: {result.metadata.row_count} tiers")

Scenario 3: AWS Cloud Setup (Athena + RDS)

File: aws_datasources.yaml
# AWS cloud datasource configuration
sources:
  # RDS PostgreSQL with IAM authentication
  rds_users:
    type: postgres
    host: prod-db.cluster-xyz.us-east-1.rds.amazonaws.com
    port: 5432
    database: production
    username: ${RDS_USERNAME}
    password: ${RDS_PASSWORD}
    sslmode: require
    
  # AWS Athena for data lake analytics
  athena_events:
    type: athena
    region: us-east-1
    # Option 1: IAM Role (recommended for EC2/ECS)
    role_arn: arn:aws:iam::123456789012:role/AthenaAnalyticsRole
    external_id: unique-external-id-for-security
    # Athena configuration
    workgroup: analytics-production
    s3_staging_dir: s3://company-athena-results/prod-staging/
    database: analytics_datalake
    catalog: AwsDataCatalog
    
  # Alternative Athena config with access keys
  athena_dev:
    type: athena
    region: us-west-2
    # Option 2: Access keys (for development/testing)
    access_key_id: ${AWS_ACCESS_KEY_ID}
    secret_access_key: ${AWS_SECRET_ACCESS_KEY}
    session_token: ${AWS_SESSION_TOKEN}  # For temporary credentials
    workgroup: dev-analytics
    s3_staging_dir: s3://dev-athena-results/staging/
    database: dev_analytics

tables:
  rds_users:
    columns:
      id: {type: integer, primary_key: true}
      username: {type: string}
      email: {type: string}
      created_at: {type: timestamp}
      
  athena_events:
    columns:
      event_id: {type: string, primary_key: true}
      user_id: {type: integer}
      event_type: {type: string}
      event_timestamp: {type: timestamp}
      properties: {type: string}
Environment file: .env
# RDS credentials
RDS_USERNAME=app_user
RDS_PASSWORD=rds_secure_password

# AWS credentials (if not using IAM roles)
AWS_ACCESS_KEY_ID=AKIAEXAMPLEACCESSKEY
AWS_SECRET_ACCESS_KEY=example-secret-access-key
AWS_SESSION_TOKEN=example-session-token

Scenario 4: Multi-Cloud Setup (GCP BigQuery + Snowflake)

File: multicloud_datasources.yaml
# Multi-cloud datasource configuration
sources:
  # Google BigQuery
  bigquery_analytics:
    type: bigquery
    project_id: my-company-analytics-prod
    dataset_id: production_analytics
    location: US
    # Service account authentication
    service_account_email: analytics-engine@my-company-analytics-prod.iam.gserviceaccount.com
    service_account_key: /path/to/service-account-key.json
    # Query limits
    maximum_bytes_billed: 10000000000  # 10GB limit
    use_legacy_sql: false
    
  # Snowflake with username/password
  snowflake_warehouse:
    type: snowflake
    account: mycompany.us-east-1
    user: analytics_service
    password: ${SNOWFLAKE_PASSWORD}
    warehouse: ANALYTICS_WH
    database: PRODUCTION
    schema: ANALYTICS
    role: ANALYTICS_READER_ROLE
    client_session_keep_alive: true
    
  # Snowflake with key-pair authentication (more secure)
  snowflake_secure:
    type: snowflake
    account: mycompany.us-east-1
    user: secure_service_account
    private_key: /secure/path/to/snowflake_private_key.pem
    private_key_passphrase: ${SNOWFLAKE_KEY_PASSPHRASE}
    warehouse: SECURE_ANALYTICS_WH
    database: PRODUCTION
    schema: SECURE_ANALYTICS

tables:
  bigquery_analytics:
    columns:
      user_id: {type: integer}
      event_timestamp: {type: timestamp}
      event_name: {type: string}
      session_id: {type: string}
      
  snowflake_warehouse:
    columns:
      id: {type: integer, primary_key: true}
      customer_id: {type: integer}
      revenue: {type: float}
      transaction_date: {type: date}
Key files needed:
# Service account key for BigQuery
/path/to/service-account-key.json

# Private key for Snowflake
/secure/path/to/snowflake_private_key.pem

Scenario 5: Enterprise with Connection API

File: enterprise_datasources.yaml
# Enterprise setup with centralized credential management
sources:
  # Connection IDs resolved via company's credential API
  prod_users_db:
    connection_id: "postgres-prod-users-readonly"
    
  prod_orders_db:
    connection_id: "mysql-prod-orders-analytics"
    
  data_warehouse:
    connection_id: "snowflake-prod-warehouse-reader"
    
  customer_analytics:
    connection_id: "bigquery-prod-customer-analytics"

# Tables remain the same - define your schema
tables:
  prod_users_db:
    columns:
      id: {type: integer, primary_key: true}
      name: {type: string}
      email: {type: string}
      tier: {type: string}
      
  prod_orders_db:
    columns:
      id: {type: integer, primary_key: true}
      customer_id: {type: integer}
      amount: {type: float}
      status: {type: string}
Python setup for enterprise:
import os
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine
from ajna_analytical_engine.config import ConnectionManager, create_config_manager

# Enterprise connection manager
connection_manager = ConnectionManager.create_with_rest_provider(
    api_endpoint=os.getenv("COMPANY_CREDENTIALS_API_URL"),
    api_token=os.getenv("COMPANY_CREDENTIALS_API_TOKEN"),
    cache_ttl=600,  # 10 minutes
    timeout=30
)

# Custom config manager
config_manager = create_config_manager(
    engine_config_path=Path("enterprise_engine.yaml"),
    datasource_config_path=Path("enterprise_datasources.yaml"),
    connection_manager=connection_manager
)

# Initialize engine
engine = AnalyticalEngine(config_manager=config_manager)

Configuration File Templates

Complete Datasource Template

# datasource_template.yaml - Copy and customize this
sources:
  # PostgreSQL examples
  postgres_table:
    type: postgres
    # Method 1: Individual parameters
    host: your-postgres-host
    port: 5432
    database: your_database
    username: your_user
    password: your_password
    # Optional PostgreSQL settings
    sslmode: prefer  # disable, allow, prefer, require
    application_name: ajna-analytics
    
  postgres_uri:
    type: postgres
    # Method 2: Connection URI
    uri: postgresql://user:password@host:port/database
    
  # MySQL examples  
  mysql_table:
    type: mysql
    host: your-mysql-host
    port: 3306
    database: your_database
    username: your_user
    password: your_password
    # Optional MySQL settings
    charset: utf8mb4
    autocommit: true
    
  # AWS Athena
  athena_table:
    type: athena
    region: us-east-1
    # Authentication option 1: IAM Role
    role_arn: arn:aws:iam::account:role/YourRole
    external_id: optional-external-id
    # Authentication option 2: Access keys
    # access_key_id: your-access-key
    # secret_access_key: your-secret-key
    workgroup: primary
    s3_staging_dir: s3://your-bucket/staging/
    database: your_athena_database
    
  # Google BigQuery
  bigquery_table:
    type: bigquery
    project_id: your-gcp-project
    dataset_id: your_dataset
    location: US
    service_account_email: [email protected]
    service_account_key: /path/to/key.json
    
  # Snowflake
  snowflake_table:
    type: snowflake
    account: yourcompany.region
    user: your_user
    # Authentication option 1: Password
    password: your_password
    # Authentication option 2: Private key
    # private_key: /path/to/private_key.pem
    # private_key_passphrase: key_passphrase
    warehouse: YOUR_WAREHOUSE
    database: YOUR_DATABASE
    schema: YOUR_SCHEMA
    
  # Enterprise connection ID
  enterprise_table:
    connection_id: "your-connection-id"

# Table schemas - Define all your tables here
tables:
  your_table_name:
    columns:
      id: 
        type: integer
        primary_key: true
        nullable: false
        description: "Primary key"
      name:
        type: string
        nullable: false
        description: "Name field"
      email:
        type: string
        nullable: true
        description: "Email address"
      created_at:
        type: timestamp
        nullable: false
        description: "Creation timestamp"
      is_active:
        type: boolean
        nullable: false
        description: "Active status"
    # Optional: Define relationships
    relationships:
      - table: related_table
        foreign_key: related_id
        type: one-to-many

Complete Engine Template

# engine_template.yaml - Copy and customize this
# Query processing settings
query:
  max_join_tables: 10              # Maximum tables in joins
  max_filter_conditions: 100       # Maximum filter conditions
  max_group_by_columns: 20        # Maximum GROUP BY columns
  default_limit: 1000             # Default LIMIT value
  max_limit: 1000000              # Maximum allowed LIMIT
  query_timeout_seconds: 300      # Query timeout
  enable_window_functions: true   # Enable window functions
  enable_complex_aggregations: true
  enable_subqueries: true
  enable_cte: true               # Common Table Expressions
  enable_query_optimization: true

# Performance and resource management
performance:
  max_memory_mb: 4096            # Maximum memory usage
  memory_warning_threshold_percent: 85.0
  max_cpu_percent: 80            # Maximum CPU usage
  enable_parallel_execution: true
  max_parallel_workers: 4        # Parallel processing workers
  slow_query_warning_seconds: 1.0
  slow_query_critical_seconds: 5.0

# Caching configuration
cache:
  enabled: true                  # Enable result caching
  size: 1000                    # Number of cached results
  ttl_seconds: 3600             # Cache time-to-live
  eviction_policy: "lru"        # Eviction strategy
  warning_hit_rate_percent: 10.0

# Logging configuration
logging:
  level: "INFO"                 # Log level
  enable_query_logging: true    # Log all queries
  enable_performance_metrics: true
  log_to_console: true          # Console output
  log_to_file: false           # File output
  log_file_path: "/var/log/ajna/engine.log"

# Health monitoring
health:
  check_interval_seconds: 30    # Health check frequency
  enable_dependency_checks: true
  enable_resource_checks: true
  enable_functional_checks: true

# Distributed tracing (optional)
tracing:
  enabled: false               # Enable tracing
  endpoint: null              # Tracing endpoint
  service_name: "ajna-analytical-engine"
  sample_rate: 1.0           # Trace sampling rate

Quick Setup Commands

1. Copy a configuration template:

# Copy the template that matches your setup
cp datasource_template.yaml my_datasources.yaml
cp engine_template.yaml my_engine.yaml

# Edit with your actual values
vim my_datasources.yaml
vim my_engine.yaml

2. Test your configuration:

from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, ConfigurationError

try:
    engine = AnalyticalEngine(
        datasource_config_path=Path("my_datasources.yaml")
    )
    print("✅ Configuration is valid!")
except ConfigurationError as e:
    print(f"❌ Configuration error: {e}")

3. Run your first query:

from ajna_analytical_engine import QueryRequest

result = engine.execute_query(QueryRequest(
    sources=["your_table_name"],
    select=["column1", "column2"],
    limit=10
))

print(f"Query returned {result.metadata.row_count} rows")
for row in result.data.to_dicts():
    print(row)

Environment Variables Reference

Set these environment variables for secure credential management:
# PostgreSQL
export POSTGRES_USER="your_user"
export POSTGRES_PASSWORD="your_password"
export POSTGRES_HOST="your_host"
export POSTGRES_PORT="5432"
export POSTGRES_DB="your_database"

# MySQL
export MYSQL_USER="your_user"
export MYSQL_PASSWORD="your_password"
export MYSQL_HOST="your_host"
export MYSQL_PORT="3306"
export MYSQL_DB="your_database"

# AWS
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_SESSION_TOKEN="your_session_token"

# Snowflake
export SNOWFLAKE_PASSWORD="your_password"
export SNOWFLAKE_KEY_PASSPHRASE="your_key_passphrase"

# Enterprise API
export COMPANY_CREDENTIALS_API_URL="https://api.company.com/credentials"
export COMPANY_CREDENTIALS_API_TOKEN="your_api_token"
Copy any of the complete configuration examples above, customize them for your environment, and you’ll be ready to use the Ajna Analytical Engine!

SQL Query Patterns

The Ajna Analytical Engine supports both native QueryRequest format and SQL queries via the convert_sql_to_query function. Here are common patterns:

Pattern 1: Simple SQL Conversion

from ajna_analytical_engine import SQLToQueryRequest

# Write familiar SQL
sql_request = SQLToQueryRequest(
    sql="SELECT name, email FROM users WHERE is_active = true LIMIT 10",
    sql_name="active_users"
)

# Convert to native format and execute
native_query = engine.convert_sql_to_query(sql_request)
result = engine.execute_query(QueryRequest(**native_query))

Pattern 2: Complex Analytics SQL

# Advanced SQL with window functions, CTEs, and aggregations
analytics_sql = SQLToQueryRequest(
    sql="""
    WITH monthly_sales AS (
        SELECT 
            DATE_TRUNC('month', order_date) as month,
            customer_id,
            SUM(amount) as monthly_total,
            COUNT(*) as order_count
        FROM orders
        WHERE order_date >= '2024-01-01'
        GROUP BY DATE_TRUNC('month', order_date), customer_id
    ),
    customer_ranks AS (
        SELECT 
            month,
            customer_id,
            monthly_total,
            order_count,
            ROW_NUMBER() OVER (PARTITION BY month ORDER BY monthly_total DESC) as rank,
            LAG(monthly_total) OVER (PARTITION BY customer_id ORDER BY month) as prev_month_total
        FROM monthly_sales
    )
    SELECT 
        cr.month,
        u.name as customer_name,
        cr.monthly_total,
        cr.order_count,
        cr.rank,
        CASE 
            WHEN cr.prev_month_total IS NULL THEN 'New Customer'
            WHEN cr.monthly_total > cr.prev_month_total THEN 'Growing'
            WHEN cr.monthly_total < cr.prev_month_total THEN 'Declining'
            ELSE 'Stable'
        END as trend
    FROM customer_ranks cr
    JOIN users u ON cr.customer_id = u.id
    WHERE cr.rank <= 100
    ORDER BY cr.month DESC, cr.rank ASC
    """,
    sql_name="top_customers_with_trends"
)

# Convert and execute
result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(analytics_sql)))
print(f"Top customer trends: {result.metadata.row_count} records")

Pattern 3: Cross-Database Queries

# Query across multiple database sources
cross_db_sql = SQLToQueryRequest(
    sql="""
    SELECT 
        u.name,
        u.email,
        u.created_at as user_since,
        o.total_orders,
        o.total_spent,
        a.last_login,
        a.total_events
    FROM users u
    LEFT JOIN (
        SELECT 
            customer_id,
            COUNT(*) as total_orders,
            SUM(amount) as total_spent
        FROM orders
        GROUP BY customer_id
    ) o ON u.id = o.customer_id
    LEFT JOIN (
        SELECT 
            user_id,
            MAX(event_date) as last_login,
            COUNT(*) as total_events
        FROM analytics
        WHERE event_name = 'login'
        GROUP BY user_id
    ) a ON u.id = a.user_id
    WHERE u.is_active = true
    ORDER BY o.total_spent DESC NULLS LAST
    LIMIT 50
    """,
    sql_name="comprehensive_user_analytics"
)

result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(cross_db_sql)))

Pattern 4: Parameterized SQL Queries

def get_customer_analytics(start_date: str, min_orders: int = 5):
    """Generate customer analytics for a specific date range"""
    
    sql_request = SQLToQueryRequest(
        sql=f"""
        SELECT 
            u.id,
            u.name,
            u.email,
            COUNT(o.id) as order_count,
            SUM(o.amount) as total_spent,
            AVG(o.amount) as avg_order_value,
            MIN(o.order_date) as first_order,
            MAX(o.order_date) as last_order,
            DATE_PART('days', MAX(o.order_date) - MIN(o.order_date)) as customer_lifespan_days
        FROM users u
        INNER JOIN orders o ON u.id = o.customer_id
        WHERE o.order_date >= '{start_date}'
        GROUP BY u.id, u.name, u.email
        HAVING COUNT(o.id) >= {min_orders}
        ORDER BY total_spent DESC
        """,
        sql_name=f"customer_analytics_{start_date.replace('-', '_')}"
    )
    
    return engine.convert_sql_to_query(sql_request)

# Usage
query_2024 = get_customer_analytics("2024-01-01", min_orders=10)
result = engine.execute_query(QueryRequest(**query_2024))
print(f"2024 customers with 10+ orders: {result.metadata.row_count}")

Pattern 5: SQL vs Native Comparison

# Same query written in both SQL and native format

# SQL approach
sql_version = SQLToQueryRequest(
    sql="""
    SELECT 
        category,
        COUNT(*) as product_count,
        AVG(price) as avg_price,
        MIN(price) as min_price,
        MAX(price) as max_price
    FROM products
    WHERE is_active = true
    GROUP BY category
    ORDER BY avg_price DESC
    """,
    sql_name="product_category_stats"
)

# Native QueryRequest approach
native_version = QueryRequest(
    sources=["products"],
    select=["category"],
    filters={"products": [{"column": "is_active", "op": "=", "value": True}]},
    aggregations=[
        {"function": "count", "column": "*", "alias": "product_count"},
        {"function": "avg", "column": "price", "alias": "avg_price"},
        {"function": "min", "column": "price", "alias": "min_price"},
        {"function": "max", "column": "price", "alias": "max_price"}
    ],
    group_by=["category"],
    order_by=[{"column": "avg_price", "direction": "desc"}]
)

# Both produce identical results
sql_result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(sql_version)))
native_result = engine.execute_query(native_version)

print(f"SQL result: {sql_result.metadata.row_count} rows")
print(f"Native result: {native_result.metadata.row_count} rows")

Pattern 6: Debugging SQL Conversion

# Debug what SQL gets converted to
sql_request = SQLToQueryRequest(
    sql="""
    SELECT 
        u.name,
        COUNT(o.id) as orders,
        RANK() OVER (ORDER BY COUNT(o.id) DESC) as rank
    FROM users u
    LEFT JOIN orders o ON u.id = o.customer_id
    GROUP BY u.id, u.name
    HAVING COUNT(o.id) > 0
    ORDER BY orders DESC
    LIMIT 20
    """,
    sql_name="debug_conversion"
)

# See the converted structure
converted = engine.convert_sql_to_query(sql_request)
print("Converted query structure:")
import json
print(json.dumps(converted, indent=2))

# Validate the conversion
validation = engine.validate_query(QueryRequest(**converted))
print("Validation result:", validation)

# Execute if valid
if validation.get("valid", False):
    result = engine.execute_query(QueryRequest(**converted))
    print(f"Query executed successfully: {result.metadata.row_count} rows")

SQL Best Practices

  1. Use descriptive sql_name for debugging and monitoring
  2. Validate complex queries before production use
  3. Prefer parameterized queries for dynamic filtering
  4. Test both SQL and native formats for complex queries
  5. Use the conversion for debugging - see what native structure your SQL creates
Choose the approach that fits your team’s preferences: SQL for familiarity, native format for type safety and validation.

Enterprise Connection Management

For large organizations, managing database credentials across multiple environments and teams becomes complex. The analytics engine supports enterprise-grade connection management through Connection IDs that resolve to actual connection details via a centralized API.

Connection ID Architecture

Instead of embedding credentials in your code, you can use Connection IDs that resolve to actual connection details through a centralized service:
from ajna_analytical_engine import AnalyticsEngine

# Initialize with REST connection provider
engine = AnalyticsEngine(
    config={
        "backend_api": {
            "endpoint": "https://your-connection-api.company.com",
            "token": "your-api-token",
            "cache_ttl": 300,  # 5 minutes
            "timeout": 30
        }
    }
)

# Use connection ID instead of credentials
result = await engine.execute_query(
    query_request={
        "sources": [
            {
                "connection_id": "prod-analytics-db",  # Resolves via API
                "table": "sales_data",
                "alias": "sales"
            }
        ],
        "select": ["date", "revenue", "region"],
        "order_by": [{"field": "date", "direction": "desc"}]
    }
)

Connection Provider REST API Contract

Your connection management API must implement these endpoints:

GET /api/v1/connections/

Returns full connection details for a specific ID:
{
  "connection_id": "prod-analytics-db",
  "connection_type": "postgres",
  "auth_type": "username_password",
  "name": "Production Analytics Database",
  "environment": "production",
  "host": "analytics-db.company.com",
  "port": 5432,
  "database": "analytics",
  "username": "readonly_user",
  "password": "encrypted_password_here",
  "ssl_mode": "require",
  "created_at": "2024-01-15T10:30:00Z",
  "expires_at": "2024-02-15T10:30:00Z",
  "status": "active",
  "allowed_operations": ["read"],
  "ip_whitelist": ["10.0.0.0/8", "192.168.1.0/24"]
}

POST /api/v1/connections//test

Tests if a connection is valid and accessible: Response:
{
  "status": "success",
  "response_time_ms": 45,
  "message": "Connection successful"
}

POST /api/v1/connections//refresh

Refreshes expired credentials for a connection: Response:
{
  "connection_id": "prod-analytics-db",
  "connection_type": "postgres",
  "password": "new_encrypted_password",
  "expires_at": "2024-03-15T10:30:00Z",
  "refreshed_at": "2024-02-15T10:30:00Z"
}

GET /api/v1/connections?environment=

Lists all available connections, optionally filtered by environment: Response:
{
  "connections": [
    {
      "connection_id": "prod-analytics-db",
      "name": "Production Analytics Database",
      "connection_type": "postgres",
      "environment": "production",
      "status": "active"
    },
    {
      "connection_id": "dev-mysql-db",
      "name": "Development MySQL",
      "connection_type": "mysql",
      "environment": "development",
      "status": "active"
    }
  ]
}

Connection Schemas by Database Type

The system supports multiple database types with their specific connection schemas:
{
  "connection_id": "prod-postgres",
  "connection_type": "postgres",
  "auth_type": "username_password",
  "host": "postgres.company.com",
  "port": 5432,
  "database": "analytics",
  "username": "user",
  "password": "encrypted_password",
  "sslmode": "require",
  "application_name": "ajna-analytics",
  "schema_name": "public",
  "connection_timeout": 30,
  "query_timeout": 300,
  "max_connections": 10
}

Authentication Methods Supported

  • username_password: Traditional database authentication
  • connection_string: Full connection URI with embedded credentials
  • iam_role: AWS IAM role-based authentication
  • access_key: AWS access key and secret key
  • service_account: GCP service account authentication
  • managed_identity: Azure managed identity
  • key_pair: SSH key-pair authentication (Snowflake)
  • oauth: OAuth token-based authentication
  • jwt: JWT token authentication
  • api_key: API key authentication

Connection Manager Implementation

Here’s how to implement the connection management system:
from ajna_analytical_engine.config.connection_manager import ConnectionManager
from ajna_analytical_engine.config.connection_contracts import RESTConnectionProvider

# Option 1: Create with REST provider
connection_manager = ConnectionManager.create_with_rest_provider(
    api_endpoint="https://your-connection-api.company.com",
    api_token="your-api-token",
    cache_ttl=300,  # 5 minutes cache
    timeout=30
)

# Option 2: Create programmatically
provider = RESTConnectionProvider(
    endpoint="https://your-connection-api.company.com",
    auth_token="your-api-token",
    timeout=30,
    retry_attempts=3
)

connection_manager = ConnectionManager(
    connection_provider=provider,
    cache_ttl=300,
    enable_cache=True
)

# Use with analytics engine
engine = AnalyticsEngine(
    config={
        "connection_manager": connection_manager
    }
)

Connection Lifecycle Operations

# Test a connection
is_valid = await connection_manager.test_connection(
    {"connection_id": "prod-analytics-db"},
    source_name="Analytics DB"
)
print(f"Connection valid: {is_valid}")

# List available connections
connections = await connection_manager.list_connections(
    environment="production"
)
print(f"Available connections: {connections}")

# Refresh expired credentials
success = await connection_manager.refresh_connection("prod-analytics-db")
print(f"Refresh successful: {success}")

# Get cache statistics
stats = connection_manager.get_cache_stats()
print(f"Cache hits: {stats['hits']}, misses: {stats['misses']}")

# Clear cache if needed
await connection_manager.clear_cache()

Configuration with Connection IDs

In your datasource configuration, use connection IDs instead of credentials:
# datasource.yaml
datasources:
  # Traditional credentials (not recommended for production)
  local_postgres:
    type: postgres
    host: localhost
    port: 5432
    database: test_db
    username: test_user
    password: test_password
    
  # Connection ID approach (recommended for enterprise)
  prod_analytics:
    connection_id: "prod-analytics-db"
    
  staging_warehouse:
    connection_id: "staging-warehouse-db"
    
  reporting_mysql:
    connection_id: "reporting-mysql-01"

# Engine configuration with connection API
engine:
  backend_api:
    endpoint: "https://connections.company.com"
    token: "${CONNECTION_API_TOKEN}"
    cache_ttl: 300
    timeout: 30
    retry_attempts: 3

API Authentication

The REST API expects a Bearer token in the Authorization header:
curl -H "Authorization: Bearer your-api-token" \
     -H "Content-Type: application/json" \
     "https://your-connection-api.company.com/api/v1/connections/prod-analytics-db"

Connection Flow Diagram

Here’s how the connection resolution works:
  1. Application requests connection: Engine needs to connect to prod-analytics-db
  2. Cache check: Connection manager checks local cache first
  3. API call: If not cached, calls REST API to resolve connection ID
  4. Connection creation: Converts response to database connection string
  5. Caching: Stores connection details in cache for future use
  6. Database connection: Uses resolved credentials to connect to database

Security Considerations

  • Credential encryption: All passwords and keys should be encrypted at rest
  • Token rotation: API tokens should be rotated regularly
  • Access controls: Implement proper RBAC for connection access
  • Audit logging: Log all connection access and credential retrievals
  • Network security: Use HTTPS for all API communications
  • IP whitelisting: Restrict connections to known IP ranges
  • Credential expiration: Implement automatic credential rotation
  • Monitoring: Alert on failed connections or credential access

Benefits of Connection ID System

  • Security: Credentials never appear in application code
  • Flexibility: Switch between environments without code changes
  • Auditability: Track who accessed which databases when
  • Scalability: Centrally manage thousands of connections
  • Compliance: Meet enterprise security requirements
  • Reliability: Automatic credential refresh and failover
  • Team isolation: Different teams can use different connection IDs
  • Environment promotion: Same code works across dev/staging/prod
This system allows you to centrally manage database credentials while keeping your application code secure and environment-agnostic.

Complete Example: AWS Athena with Connection ID

Here’s a complete example showing how to use AWS Athena with Connection ID and access key authentication:

1. Connection API Response for Athena

When your connection API receives a request for GET /api/v1/connections/prod-athena-warehouse, it should return:
{
  "connection_id": "prod-athena-warehouse",
  "connection_type": "athena",
  "auth_type": "access_key",
  "name": "Production Athena Data Warehouse",
  "description": "Main data warehouse for production analytics",
  "environment": "production",
  
  // AWS Configuration
  "region": "us-east-1",
  "access_key_id": "AKIAIOSFODNN7EXAMPLE",
  "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
  "session_token": null,  // Optional for temporary credentials
  
  // Athena-specific settings
  "workgroup": "analytics-production",
  "s3_staging_dir": "s3://company-athena-results/production/",
  "database": "analytics_db",
  "catalog": "AwsDataCatalog",
  
  // Security and lifecycle
  "created_at": "2024-01-15T10:30:00Z",
  "expires_at": "2024-04-15T10:30:00Z",  // Access keys expire quarterly
  "last_used": "2024-02-15T14:22:33Z",
  "status": "active",
  "allowed_operations": ["read", "write"],
  "ip_whitelist": ["10.0.0.0/8", "203.0.113.0/24"]
}

2. Application Configuration

Configure your application to use the Connection ID:
# datasources.yaml
datasources:
  # Use Connection ID instead of direct credentials
  athena_warehouse:
    connection_id: "prod-athena-warehouse"
    
  # You can also specify additional table-level settings
  sales_data:
    connection_id: "prod-athena-warehouse"
    table: "sales_fact"
    schema: "analytics"

# engine.yaml
engine:
  # Configure connection API
  backend_api:
    endpoint: "https://connections.company.com"
    token: "${CONNECTION_API_TOKEN}"
    cache_ttl: 300  # Cache for 5 minutes
    timeout: 30
    retry_attempts: 3

3. Python Implementation

from ajna_analytical_engine import AnalyticsEngine
from ajna_analytical_engine.types.query import QueryRequest
import os

# Initialize engine with connection API
engine = AnalyticsEngine(
    config={
        "backend_api": {
            "endpoint": "https://connections.company.com",
            "token": os.getenv("CONNECTION_API_TOKEN"),
            "cache_ttl": 300,
            "timeout": 30
        }
    }
)

# Query using Connection ID - the engine will automatically resolve credentials
athena_query = QueryRequest(
    sources=[
        {
            "connection_id": "prod-athena-warehouse",  # Resolved via API
            "table": "sales_fact",
            "alias": "sales"
        },
        {
            "connection_id": "prod-athena-warehouse",
            "table": "customer_dim", 
            "alias": "customers"
        }
    ],
    select=[
        "sales.order_date",
        "customers.customer_name",
        "sales.amount",
        "sales.region"
    ],
    joins=[
        {
            "type": "inner",
            "left_table": "sales",
            "right_table": "customers", 
            "left_column": "customer_id",
            "right_column": "customer_id"
        }
    ],
    filters={
        "sales": [
            {
                "column": "order_date",
                "op": ">=",
                "value": "2024-01-01"
            }
        ]
    },
    order_by=[
        {"column": "order_date", "direction": "desc"}
    ],
    limit=1000
)

# Execute query - credentials are fetched automatically
result = await engine.execute_query(athena_query)
print(f"Query returned {result.metadata.row_count} rows")

# The engine converts the connection details to this Athena URI internally:
# awsathena+rest://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%[email protected]:443/analytics_db?s3_staging_dir=s3%3A//company-athena-results/production/&workgroup=analytics-production

4. SQL Query Example with Connection ID

from ajna_analytical_engine.types.query import SQLToQueryRequest

# Use SQL with Connection ID resolution
sql_query = SQLToQueryRequest(
    sql="""
    WITH monthly_sales AS (
        SELECT 
            DATE_TRUNC('month', order_date) as month,
            region,
            SUM(amount) as total_revenue,
            COUNT(DISTINCT customer_id) as unique_customers,
            AVG(amount) as avg_order_value
        FROM sales_fact
        WHERE order_date >= DATE '2024-01-01'
        GROUP BY DATE_TRUNC('month', order_date), region
    ),
    regional_growth AS (
        SELECT 
            month,
            region,
            total_revenue,
            unique_customers,
            avg_order_value,
            LAG(total_revenue) OVER (
                PARTITION BY region 
                ORDER BY month
            ) as prev_month_revenue,
            PERCENT_RANK() OVER (
                PARTITION BY month 
                ORDER BY total_revenue DESC
            ) as revenue_rank
        FROM monthly_sales
    )
    SELECT 
        month,
        region,
        total_revenue,
        unique_customers,
        avg_order_value,
        CASE 
            WHEN prev_month_revenue IS NULL THEN 'New'
            WHEN total_revenue > prev_month_revenue THEN 'Growing'
            WHEN total_revenue < prev_month_revenue THEN 'Declining'
            ELSE 'Stable'
        END as trend,
        ROUND(revenue_rank * 100, 1) as percentile_rank
    FROM regional_growth
    ORDER BY month DESC, total_revenue DESC
    """,
    sql_name="athena_regional_growth_analysis",
    # Connection ID is resolved from datasource config
    default_connection_id="prod-athena-warehouse"
)

# Convert SQL to QueryRequest and execute
query_request = engine.convert_sql_to_query(sql_query)
result = await engine.execute_query(QueryRequest(**query_request))

print(f"Regional growth analysis: {result.metadata.row_count} rows")
for row in result.data.to_dicts()[:5]:  # Show first 5 rows
    print(f"{row['month']} - {row['region']}: ${row['total_revenue']:,.2f} ({row['trend']})")

5. Connection Management Operations

from ajna_analytical_engine.config.connection_manager import ConnectionManager

# Create connection manager
connection_manager = ConnectionManager.create_with_rest_provider(
    api_endpoint="https://connections.company.com",
    api_token=os.getenv("CONNECTION_API_TOKEN"),
    cache_ttl=300
)

# Test the Athena connection
is_valid = await connection_manager.test_connection(
    {"connection_id": "prod-athena-warehouse"},
    source_name="Athena Warehouse"
)
print(f"Athena connection valid: {is_valid}")

# List all production connections
prod_connections = await connection_manager.list_connections(
    environment="production"
)
print(f"Production connections: {prod_connections}")

# Get connection details (for debugging)
try:
    athena_connection = await connection_manager.connection_provider.get_connection(
        "prod-athena-warehouse"
    )
    print(f"Connection type: {athena_connection.connection_type}")
    print(f"Region: {athena_connection.region}")
    print(f"Workgroup: {athena_connection.workgroup}")
    print(f"S3 staging: {athena_connection.s3_staging_dir}")
except Exception as e:
    print(f"Error fetching connection: {e}")

# Refresh credentials if they're expiring
success = await connection_manager.refresh_connection("prod-athena-warehouse")
print(f"Credential refresh successful: {success}")

# Cache statistics
stats = connection_manager.get_cache_stats()
print(f"Cache performance - Hits: {stats.get('hits', 0)}, Misses: {stats.get('misses', 0)}")

6. Environment-Specific Configuration

# Different environments use different connection IDs
import os

ENVIRONMENT = os.getenv("ENVIRONMENT", "development")

# Map environments to connection IDs
ATHENA_CONNECTIONS = {
    "development": "dev-athena-sandbox",
    "staging": "staging-athena-warehouse", 
    "production": "prod-athena-warehouse"
}

# Use environment-specific connection
athena_connection_id = ATHENA_CONNECTIONS[ENVIRONMENT]

query = QueryRequest(
    sources=[
        {
            "connection_id": athena_connection_id,
            "table": "sales_fact",
            "alias": "sales"
        }
    ],
    select=["order_date", "amount", "region"],
    limit=100
)

result = await engine.execute_query(query)
print(f"Environment: {ENVIRONMENT}")
print(f"Connection ID: {athena_connection_id}")
print(f"Results: {result.metadata.row_count} rows")

7. Error Handling and Monitoring

from ajna_analytical_engine.exceptions import ConnectionError, AuthenticationError

async def execute_athena_query_with_retry(query_request, max_retries=3):
    """Execute Athena query with automatic retry and credential refresh."""
    
    for attempt in range(max_retries):
        try:
            result = await engine.execute_query(query_request)
            return result
            
        except AuthenticationError as e:
            print(f"Authentication failed (attempt {attempt + 1}): {e}")
            
            if attempt < max_retries - 1:
                # Try refreshing credentials
                success = await connection_manager.refresh_connection(
                    "prod-athena-warehouse"
                )
                if success:
                    print("Credentials refreshed, retrying...")
                    continue
                    
            raise e
            
        except ConnectionError as e:
            print(f"Connection failed (attempt {attempt + 1}): {e}")
            
            if attempt < max_retries - 1:
                print("Retrying connection...")
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
                continue
                
            raise e

# Usage
try:
    result = await execute_athena_query_with_retry(athena_query)
    print(f"Query successful: {result.metadata.row_count} rows")
except Exception as e:
    print(f"Query failed after retries: {e}")
This example shows the complete workflow from API response to query execution, including error handling and environment management. The Connection ID system handles all the credential resolution automatically while keeping your application code secure and environment-agnostic.