Configuration Guide
Get the Ajna Analytical Engine running with complete, copy-paste ready configuration files. Every example below is a complete working configuration.Quick Start - Copy These Files
Scenario 1: Local Development (PostgreSQL)
File:dev_datasources.yaml
Copy
# Complete datasource configuration for local PostgreSQL development
sources:
users:
type: postgres
host: localhost
port: 5432
database: myapp_dev
username: dev_user
password: dev_password
orders:
type: postgres
host: localhost
port: 5432
database: myapp_dev
username: dev_user
password: dev_password
tables:
users:
columns:
id: {type: integer, primary_key: true, nullable: false}
name: {type: string, nullable: false}
email: {type: string, nullable: true}
age: {type: integer, nullable: true}
created_at: {type: timestamp, nullable: false}
is_active: {type: boolean, nullable: false}
relationships:
- table: orders
foreign_key: customer_id
type: one-to-many
orders:
columns:
id: {type: integer, primary_key: true, nullable: false}
customer_id: {type: integer, nullable: false}
product_name: {type: string, nullable: false}
amount: {type: float, nullable: false}
order_date: {type: date, nullable: false}
status: {type: string, nullable: false}
relationships:
- table: users
foreign_key: customer_id
type: many-to-one
dev_engine.yaml (optional)
Copy
# Development engine configuration - optimized for fast feedback
query:
default_limit: 100
max_limit: 10000
query_timeout_seconds: 60
enable_window_functions: true
enable_complex_aggregations: true
enable_subqueries: true
enable_cte: true
performance:
max_memory_mb: 2048
max_parallel_workers: 2
enable_parallel_execution: true
cache:
enabled: true
size: 100
ttl_seconds: 300
logging:
level: "DEBUG"
enable_query_logging: true
enable_performance_metrics: true
log_to_console: true
log_to_file: false
Copy
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, QueryRequest
# Initialize engine
engine = AnalyticalEngine(
engine_config_path=Path("dev_engine.yaml"), # Optional
datasource_config_path=Path("dev_datasources.yaml") # Required
)
# Method 1: Native QueryRequest
result = engine.execute_query(QueryRequest(
sources=["users"],
select=["name", "email"],
limit=5
))
print(f"Found {result.metadata.row_count} users")
# Method 2: SQL Query with Conversion
from ajna_analytical_engine import SQLToQueryRequest
sql_request = SQLToQueryRequest(
sql="SELECT name, email FROM users WHERE is_active = true LIMIT 5",
sql_name="active_users"
)
# Convert SQL to native format
native_query = engine.convert_sql_to_query(sql_request)
print("Converted query:", native_query)
# Execute the converted query
result = engine.execute_query(QueryRequest(**native_query))
print(f"Found {result.metadata.row_count} active users")
# Method 3: Complex SQL with Joins
complex_sql = SQLToQueryRequest(
sql="""
SELECT
u.name,
u.email,
COUNT(o.id) as order_count,
SUM(o.amount) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.customer_id
WHERE u.is_active = true
GROUP BY u.id, u.name, u.email
ORDER BY total_spent DESC
LIMIT 10
""",
sql_name="top_customers"
)
# Convert and execute
native_query = engine.convert_sql_to_query(complex_sql)
result = engine.execute_query(QueryRequest(**native_query))
print(f"Top customers query returned {result.metadata.row_count} rows")
Scenario 2: Production with Environment Variables
File:prod_datasources.yaml
Copy
# Production datasource configuration with secure environment variables
sources:
# PostgreSQL - Main application database
users:
type: postgres
uri: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
# MySQL - Legacy order system
orders:
type: mysql
uri: mysql://${MYSQL_USER}:${MYSQL_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DB}
# PostgreSQL - Analytics warehouse
analytics:
type: postgres
host: ${ANALYTICS_DB_HOST}
port: ${ANALYTICS_DB_PORT}
database: ${ANALYTICS_DB_NAME}
username: ${ANALYTICS_DB_USER}
password: ${ANALYTICS_DB_PASSWORD}
sslmode: require
application_name: ajna-analytics-prod
tables:
users:
columns:
id: {type: integer, primary_key: true}
name: {type: string}
email: {type: string}
created_at: {type: timestamp}
is_active: {type: boolean}
orders:
columns:
id: {type: integer, primary_key: true}
customer_id: {type: integer}
amount: {type: float}
order_date: {type: date}
status: {type: string}
analytics:
columns:
user_id: {type: integer}
event_name: {type: string}
event_date: {type: date}
properties: {type: string}
prod_engine.yaml
Copy
# Production engine configuration - optimized for performance and monitoring
query:
default_limit: 1000
max_limit: 1000000
query_timeout_seconds: 600
max_join_tables: 20
max_filter_conditions: 200
enable_window_functions: true
enable_complex_aggregations: true
enable_subqueries: true
enable_cte: true
enable_query_optimization: true
performance:
max_memory_mb: 16384
memory_warning_threshold_percent: 85.0
max_cpu_percent: 80
enable_parallel_execution: true
max_parallel_workers: 16
slow_query_warning_seconds: 2.0
slow_query_critical_seconds: 10.0
cache:
enabled: true
size: 5000
ttl_seconds: 3600
eviction_policy: "lru"
warning_hit_rate_percent: 15.0
logging:
level: "INFO"
enable_query_logging: true
enable_performance_metrics: true
log_to_console: false
log_to_file: true
log_file_path: "/var/log/ajna/prod-engine.log"
health:
check_interval_seconds: 30
enable_dependency_checks: true
enable_resource_checks: true
enable_functional_checks: true
tracing:
enabled: true
endpoint: "http://jaeger-collector:14268/api/traces"
service_name: "ajna-analytics-prod"
sample_rate: 0.1
.env
Copy
# PostgreSQL - Main database
POSTGRES_USER=prod_app_user
POSTGRES_PASSWORD=secure_prod_password_123
POSTGRES_HOST=prod-postgres.company.com
POSTGRES_PORT=5432
POSTGRES_DB=production_app
# MySQL - Legacy system
MYSQL_USER=analytics_readonly
MYSQL_PASSWORD=mysql_analytics_pass_456
MYSQL_HOST=legacy-mysql.company.com
MYSQL_PORT=3306
MYSQL_DB=legacy_orders
# Analytics warehouse
ANALYTICS_DB_HOST=warehouse.company.com
ANALYTICS_DB_PORT=5432
ANALYTICS_DB_NAME=analytics_warehouse
ANALYTICS_DB_USER=warehouse_reader
ANALYTICS_DB_PASSWORD=warehouse_secure_pass_789
Copy
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, QueryRequest, SQLToQueryRequest
# Initialize production engine
engine = AnalyticalEngine(
engine_config_path=Path("prod_engine.yaml"),
datasource_config_path=Path("prod_datasources.yaml")
)
# Production SQL Analytics Examples
# 1. Cross-database analytics (PostgreSQL + MySQL)
cross_db_sql = SQLToQueryRequest(
sql="""
SELECT
u.name as customer_name,
u.email,
COUNT(o.id) as total_orders,
SUM(o.amount) as lifetime_value,
AVG(o.amount) as avg_order_value,
MAX(o.order_date) as last_order_date
FROM users u
INNER JOIN orders o ON u.id = o.customer_id
WHERE u.is_active = true
AND o.order_date >= '2024-01-01'
GROUP BY u.id, u.name, u.email
HAVING COUNT(o.id) >= 5
ORDER BY lifetime_value DESC
LIMIT 100
""",
sql_name="high_value_customers_2024"
)
result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(cross_db_sql)))
print(f"Found {result.metadata.row_count} high-value customers")
# 2. Analytics warehouse reporting
warehouse_sql = SQLToQueryRequest(
sql="""
SELECT
DATE_TRUNC('month', event_date) as month,
event_name,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM analytics
WHERE event_date >= CURRENT_DATE - INTERVAL '12 months'
GROUP BY DATE_TRUNC('month', event_date), event_name
ORDER BY month DESC, event_count DESC
""",
sql_name="monthly_event_analytics"
)
result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(warehouse_sql)))
print(f"Monthly analytics: {result.metadata.row_count} data points")
# 3. Performance monitoring query
perf_sql = SQLToQueryRequest(
sql="""
WITH order_stats AS (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order
FROM orders
WHERE order_date >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY customer_id
)
SELECT
CASE
WHEN total_spent >= 10000 THEN 'VIP'
WHEN total_spent >= 5000 THEN 'Premium'
WHEN total_spent >= 1000 THEN 'Standard'
ELSE 'Basic'
END as customer_tier,
COUNT(*) as customer_count,
AVG(total_spent) as avg_spent_per_tier,
SUM(total_spent) as tier_total_revenue
FROM order_stats
GROUP BY 1
ORDER BY tier_total_revenue DESC
""",
sql_name="customer_segmentation_analysis"
)
# Show the converted native query format
native_query = engine.convert_sql_to_query(perf_sql)
print("Native query structure:", native_query)
result = engine.execute_query(QueryRequest(**native_query))
print(f"Customer segmentation: {result.metadata.row_count} tiers")
Scenario 3: AWS Cloud Setup (Athena + RDS)
File:aws_datasources.yaml
Copy
# AWS cloud datasource configuration
sources:
# RDS PostgreSQL with IAM authentication
rds_users:
type: postgres
host: prod-db.cluster-xyz.us-east-1.rds.amazonaws.com
port: 5432
database: production
username: ${RDS_USERNAME}
password: ${RDS_PASSWORD}
sslmode: require
# AWS Athena for data lake analytics
athena_events:
type: athena
region: us-east-1
# Option 1: IAM Role (recommended for EC2/ECS)
role_arn: arn:aws:iam::123456789012:role/AthenaAnalyticsRole
external_id: unique-external-id-for-security
# Athena configuration
workgroup: analytics-production
s3_staging_dir: s3://company-athena-results/prod-staging/
database: analytics_datalake
catalog: AwsDataCatalog
# Alternative Athena config with access keys
athena_dev:
type: athena
region: us-west-2
# Option 2: Access keys (for development/testing)
access_key_id: ${AWS_ACCESS_KEY_ID}
secret_access_key: ${AWS_SECRET_ACCESS_KEY}
session_token: ${AWS_SESSION_TOKEN} # For temporary credentials
workgroup: dev-analytics
s3_staging_dir: s3://dev-athena-results/staging/
database: dev_analytics
tables:
rds_users:
columns:
id: {type: integer, primary_key: true}
username: {type: string}
email: {type: string}
created_at: {type: timestamp}
athena_events:
columns:
event_id: {type: string, primary_key: true}
user_id: {type: integer}
event_type: {type: string}
event_timestamp: {type: timestamp}
properties: {type: string}
.env
Copy
# RDS credentials
RDS_USERNAME=app_user
RDS_PASSWORD=rds_secure_password
# AWS credentials (if not using IAM roles)
AWS_ACCESS_KEY_ID=AKIAEXAMPLEACCESSKEY
AWS_SECRET_ACCESS_KEY=example-secret-access-key
AWS_SESSION_TOKEN=example-session-token
Scenario 4: Multi-Cloud Setup (GCP BigQuery + Snowflake)
File:multicloud_datasources.yaml
Copy
# Multi-cloud datasource configuration
sources:
# Google BigQuery
bigquery_analytics:
type: bigquery
project_id: my-company-analytics-prod
dataset_id: production_analytics
location: US
# Service account authentication
service_account_email: analytics-engine@my-company-analytics-prod.iam.gserviceaccount.com
service_account_key: /path/to/service-account-key.json
# Query limits
maximum_bytes_billed: 10000000000 # 10GB limit
use_legacy_sql: false
# Snowflake with username/password
snowflake_warehouse:
type: snowflake
account: mycompany.us-east-1
user: analytics_service
password: ${SNOWFLAKE_PASSWORD}
warehouse: ANALYTICS_WH
database: PRODUCTION
schema: ANALYTICS
role: ANALYTICS_READER_ROLE
client_session_keep_alive: true
# Snowflake with key-pair authentication (more secure)
snowflake_secure:
type: snowflake
account: mycompany.us-east-1
user: secure_service_account
private_key: /secure/path/to/snowflake_private_key.pem
private_key_passphrase: ${SNOWFLAKE_KEY_PASSPHRASE}
warehouse: SECURE_ANALYTICS_WH
database: PRODUCTION
schema: SECURE_ANALYTICS
tables:
bigquery_analytics:
columns:
user_id: {type: integer}
event_timestamp: {type: timestamp}
event_name: {type: string}
session_id: {type: string}
snowflake_warehouse:
columns:
id: {type: integer, primary_key: true}
customer_id: {type: integer}
revenue: {type: float}
transaction_date: {type: date}
Copy
# Service account key for BigQuery
/path/to/service-account-key.json
# Private key for Snowflake
/secure/path/to/snowflake_private_key.pem
Scenario 5: Enterprise with Connection API
File:enterprise_datasources.yaml
Copy
# Enterprise setup with centralized credential management
sources:
# Connection IDs resolved via company's credential API
prod_users_db:
connection_id: "postgres-prod-users-readonly"
prod_orders_db:
connection_id: "mysql-prod-orders-analytics"
data_warehouse:
connection_id: "snowflake-prod-warehouse-reader"
customer_analytics:
connection_id: "bigquery-prod-customer-analytics"
# Tables remain the same - define your schema
tables:
prod_users_db:
columns:
id: {type: integer, primary_key: true}
name: {type: string}
email: {type: string}
tier: {type: string}
prod_orders_db:
columns:
id: {type: integer, primary_key: true}
customer_id: {type: integer}
amount: {type: float}
status: {type: string}
Copy
import os
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine
from ajna_analytical_engine.config import ConnectionManager, create_config_manager
# Enterprise connection manager
connection_manager = ConnectionManager.create_with_rest_provider(
api_endpoint=os.getenv("COMPANY_CREDENTIALS_API_URL"),
api_token=os.getenv("COMPANY_CREDENTIALS_API_TOKEN"),
cache_ttl=600, # 10 minutes
timeout=30
)
# Custom config manager
config_manager = create_config_manager(
engine_config_path=Path("enterprise_engine.yaml"),
datasource_config_path=Path("enterprise_datasources.yaml"),
connection_manager=connection_manager
)
# Initialize engine
engine = AnalyticalEngine(config_manager=config_manager)
Configuration File Templates
Complete Datasource Template
Copy
# datasource_template.yaml - Copy and customize this
sources:
# PostgreSQL examples
postgres_table:
type: postgres
# Method 1: Individual parameters
host: your-postgres-host
port: 5432
database: your_database
username: your_user
password: your_password
# Optional PostgreSQL settings
sslmode: prefer # disable, allow, prefer, require
application_name: ajna-analytics
postgres_uri:
type: postgres
# Method 2: Connection URI
uri: postgresql://user:password@host:port/database
# MySQL examples
mysql_table:
type: mysql
host: your-mysql-host
port: 3306
database: your_database
username: your_user
password: your_password
# Optional MySQL settings
charset: utf8mb4
autocommit: true
# AWS Athena
athena_table:
type: athena
region: us-east-1
# Authentication option 1: IAM Role
role_arn: arn:aws:iam::account:role/YourRole
external_id: optional-external-id
# Authentication option 2: Access keys
# access_key_id: your-access-key
# secret_access_key: your-secret-key
workgroup: primary
s3_staging_dir: s3://your-bucket/staging/
database: your_athena_database
# Google BigQuery
bigquery_table:
type: bigquery
project_id: your-gcp-project
dataset_id: your_dataset
location: US
service_account_email: [email protected]
service_account_key: /path/to/key.json
# Snowflake
snowflake_table:
type: snowflake
account: yourcompany.region
user: your_user
# Authentication option 1: Password
password: your_password
# Authentication option 2: Private key
# private_key: /path/to/private_key.pem
# private_key_passphrase: key_passphrase
warehouse: YOUR_WAREHOUSE
database: YOUR_DATABASE
schema: YOUR_SCHEMA
# Enterprise connection ID
enterprise_table:
connection_id: "your-connection-id"
# Table schemas - Define all your tables here
tables:
your_table_name:
columns:
id:
type: integer
primary_key: true
nullable: false
description: "Primary key"
name:
type: string
nullable: false
description: "Name field"
email:
type: string
nullable: true
description: "Email address"
created_at:
type: timestamp
nullable: false
description: "Creation timestamp"
is_active:
type: boolean
nullable: false
description: "Active status"
# Optional: Define relationships
relationships:
- table: related_table
foreign_key: related_id
type: one-to-many
Complete Engine Template
Copy
# engine_template.yaml - Copy and customize this
# Query processing settings
query:
max_join_tables: 10 # Maximum tables in joins
max_filter_conditions: 100 # Maximum filter conditions
max_group_by_columns: 20 # Maximum GROUP BY columns
default_limit: 1000 # Default LIMIT value
max_limit: 1000000 # Maximum allowed LIMIT
query_timeout_seconds: 300 # Query timeout
enable_window_functions: true # Enable window functions
enable_complex_aggregations: true
enable_subqueries: true
enable_cte: true # Common Table Expressions
enable_query_optimization: true
# Performance and resource management
performance:
max_memory_mb: 4096 # Maximum memory usage
memory_warning_threshold_percent: 85.0
max_cpu_percent: 80 # Maximum CPU usage
enable_parallel_execution: true
max_parallel_workers: 4 # Parallel processing workers
slow_query_warning_seconds: 1.0
slow_query_critical_seconds: 5.0
# Caching configuration
cache:
enabled: true # Enable result caching
size: 1000 # Number of cached results
ttl_seconds: 3600 # Cache time-to-live
eviction_policy: "lru" # Eviction strategy
warning_hit_rate_percent: 10.0
# Logging configuration
logging:
level: "INFO" # Log level
enable_query_logging: true # Log all queries
enable_performance_metrics: true
log_to_console: true # Console output
log_to_file: false # File output
log_file_path: "/var/log/ajna/engine.log"
# Health monitoring
health:
check_interval_seconds: 30 # Health check frequency
enable_dependency_checks: true
enable_resource_checks: true
enable_functional_checks: true
# Distributed tracing (optional)
tracing:
enabled: false # Enable tracing
endpoint: null # Tracing endpoint
service_name: "ajna-analytical-engine"
sample_rate: 1.0 # Trace sampling rate
Quick Setup Commands
1. Copy a configuration template:
Copy
# Copy the template that matches your setup
cp datasource_template.yaml my_datasources.yaml
cp engine_template.yaml my_engine.yaml
# Edit with your actual values
vim my_datasources.yaml
vim my_engine.yaml
2. Test your configuration:
Copy
from pathlib import Path
from ajna_analytical_engine import AnalyticalEngine, ConfigurationError
try:
engine = AnalyticalEngine(
datasource_config_path=Path("my_datasources.yaml")
)
print("✅ Configuration is valid!")
except ConfigurationError as e:
print(f"❌ Configuration error: {e}")
3. Run your first query:
Copy
from ajna_analytical_engine import QueryRequest
result = engine.execute_query(QueryRequest(
sources=["your_table_name"],
select=["column1", "column2"],
limit=10
))
print(f"Query returned {result.metadata.row_count} rows")
for row in result.data.to_dicts():
print(row)
Environment Variables Reference
Set these environment variables for secure credential management:Copy
# PostgreSQL
export POSTGRES_USER="your_user"
export POSTGRES_PASSWORD="your_password"
export POSTGRES_HOST="your_host"
export POSTGRES_PORT="5432"
export POSTGRES_DB="your_database"
# MySQL
export MYSQL_USER="your_user"
export MYSQL_PASSWORD="your_password"
export MYSQL_HOST="your_host"
export MYSQL_PORT="3306"
export MYSQL_DB="your_database"
# AWS
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_SESSION_TOKEN="your_session_token"
# Snowflake
export SNOWFLAKE_PASSWORD="your_password"
export SNOWFLAKE_KEY_PASSPHRASE="your_key_passphrase"
# Enterprise API
export COMPANY_CREDENTIALS_API_URL="https://api.company.com/credentials"
export COMPANY_CREDENTIALS_API_TOKEN="your_api_token"
SQL Query Patterns
The Ajna Analytical Engine supports both nativeQueryRequest format and SQL queries via the convert_sql_to_query function. Here are common patterns:
Pattern 1: Simple SQL Conversion
Copy
from ajna_analytical_engine import SQLToQueryRequest
# Write familiar SQL
sql_request = SQLToQueryRequest(
sql="SELECT name, email FROM users WHERE is_active = true LIMIT 10",
sql_name="active_users"
)
# Convert to native format and execute
native_query = engine.convert_sql_to_query(sql_request)
result = engine.execute_query(QueryRequest(**native_query))
Pattern 2: Complex Analytics SQL
Copy
# Advanced SQL with window functions, CTEs, and aggregations
analytics_sql = SQLToQueryRequest(
sql="""
WITH monthly_sales AS (
SELECT
DATE_TRUNC('month', order_date) as month,
customer_id,
SUM(amount) as monthly_total,
COUNT(*) as order_count
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY DATE_TRUNC('month', order_date), customer_id
),
customer_ranks AS (
SELECT
month,
customer_id,
monthly_total,
order_count,
ROW_NUMBER() OVER (PARTITION BY month ORDER BY monthly_total DESC) as rank,
LAG(monthly_total) OVER (PARTITION BY customer_id ORDER BY month) as prev_month_total
FROM monthly_sales
)
SELECT
cr.month,
u.name as customer_name,
cr.monthly_total,
cr.order_count,
cr.rank,
CASE
WHEN cr.prev_month_total IS NULL THEN 'New Customer'
WHEN cr.monthly_total > cr.prev_month_total THEN 'Growing'
WHEN cr.monthly_total < cr.prev_month_total THEN 'Declining'
ELSE 'Stable'
END as trend
FROM customer_ranks cr
JOIN users u ON cr.customer_id = u.id
WHERE cr.rank <= 100
ORDER BY cr.month DESC, cr.rank ASC
""",
sql_name="top_customers_with_trends"
)
# Convert and execute
result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(analytics_sql)))
print(f"Top customer trends: {result.metadata.row_count} records")
Pattern 3: Cross-Database Queries
Copy
# Query across multiple database sources
cross_db_sql = SQLToQueryRequest(
sql="""
SELECT
u.name,
u.email,
u.created_at as user_since,
o.total_orders,
o.total_spent,
a.last_login,
a.total_events
FROM users u
LEFT JOIN (
SELECT
customer_id,
COUNT(*) as total_orders,
SUM(amount) as total_spent
FROM orders
GROUP BY customer_id
) o ON u.id = o.customer_id
LEFT JOIN (
SELECT
user_id,
MAX(event_date) as last_login,
COUNT(*) as total_events
FROM analytics
WHERE event_name = 'login'
GROUP BY user_id
) a ON u.id = a.user_id
WHERE u.is_active = true
ORDER BY o.total_spent DESC NULLS LAST
LIMIT 50
""",
sql_name="comprehensive_user_analytics"
)
result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(cross_db_sql)))
Pattern 4: Parameterized SQL Queries
Copy
def get_customer_analytics(start_date: str, min_orders: int = 5):
"""Generate customer analytics for a specific date range"""
sql_request = SQLToQueryRequest(
sql=f"""
SELECT
u.id,
u.name,
u.email,
COUNT(o.id) as order_count,
SUM(o.amount) as total_spent,
AVG(o.amount) as avg_order_value,
MIN(o.order_date) as first_order,
MAX(o.order_date) as last_order,
DATE_PART('days', MAX(o.order_date) - MIN(o.order_date)) as customer_lifespan_days
FROM users u
INNER JOIN orders o ON u.id = o.customer_id
WHERE o.order_date >= '{start_date}'
GROUP BY u.id, u.name, u.email
HAVING COUNT(o.id) >= {min_orders}
ORDER BY total_spent DESC
""",
sql_name=f"customer_analytics_{start_date.replace('-', '_')}"
)
return engine.convert_sql_to_query(sql_request)
# Usage
query_2024 = get_customer_analytics("2024-01-01", min_orders=10)
result = engine.execute_query(QueryRequest(**query_2024))
print(f"2024 customers with 10+ orders: {result.metadata.row_count}")
Pattern 5: SQL vs Native Comparison
Copy
# Same query written in both SQL and native format
# SQL approach
sql_version = SQLToQueryRequest(
sql="""
SELECT
category,
COUNT(*) as product_count,
AVG(price) as avg_price,
MIN(price) as min_price,
MAX(price) as max_price
FROM products
WHERE is_active = true
GROUP BY category
ORDER BY avg_price DESC
""",
sql_name="product_category_stats"
)
# Native QueryRequest approach
native_version = QueryRequest(
sources=["products"],
select=["category"],
filters={"products": [{"column": "is_active", "op": "=", "value": True}]},
aggregations=[
{"function": "count", "column": "*", "alias": "product_count"},
{"function": "avg", "column": "price", "alias": "avg_price"},
{"function": "min", "column": "price", "alias": "min_price"},
{"function": "max", "column": "price", "alias": "max_price"}
],
group_by=["category"],
order_by=[{"column": "avg_price", "direction": "desc"}]
)
# Both produce identical results
sql_result = engine.execute_query(QueryRequest(**engine.convert_sql_to_query(sql_version)))
native_result = engine.execute_query(native_version)
print(f"SQL result: {sql_result.metadata.row_count} rows")
print(f"Native result: {native_result.metadata.row_count} rows")
Pattern 6: Debugging SQL Conversion
Copy
# Debug what SQL gets converted to
sql_request = SQLToQueryRequest(
sql="""
SELECT
u.name,
COUNT(o.id) as orders,
RANK() OVER (ORDER BY COUNT(o.id) DESC) as rank
FROM users u
LEFT JOIN orders o ON u.id = o.customer_id
GROUP BY u.id, u.name
HAVING COUNT(o.id) > 0
ORDER BY orders DESC
LIMIT 20
""",
sql_name="debug_conversion"
)
# See the converted structure
converted = engine.convert_sql_to_query(sql_request)
print("Converted query structure:")
import json
print(json.dumps(converted, indent=2))
# Validate the conversion
validation = engine.validate_query(QueryRequest(**converted))
print("Validation result:", validation)
# Execute if valid
if validation.get("valid", False):
result = engine.execute_query(QueryRequest(**converted))
print(f"Query executed successfully: {result.metadata.row_count} rows")
SQL Best Practices
- Use descriptive
sql_namefor debugging and monitoring - Validate complex queries before production use
- Prefer parameterized queries for dynamic filtering
- Test both SQL and native formats for complex queries
- Use the conversion for debugging - see what native structure your SQL creates
Enterprise Connection Management
For large organizations, managing database credentials across multiple environments and teams becomes complex. The analytics engine supports enterprise-grade connection management through Connection IDs that resolve to actual connection details via a centralized API.Connection ID Architecture
Instead of embedding credentials in your code, you can use Connection IDs that resolve to actual connection details through a centralized service:Copy
from ajna_analytical_engine import AnalyticsEngine
# Initialize with REST connection provider
engine = AnalyticsEngine(
config={
"backend_api": {
"endpoint": "https://your-connection-api.company.com",
"token": "your-api-token",
"cache_ttl": 300, # 5 minutes
"timeout": 30
}
}
)
# Use connection ID instead of credentials
result = await engine.execute_query(
query_request={
"sources": [
{
"connection_id": "prod-analytics-db", # Resolves via API
"table": "sales_data",
"alias": "sales"
}
],
"select": ["date", "revenue", "region"],
"order_by": [{"field": "date", "direction": "desc"}]
}
)
Connection Provider REST API Contract
Your connection management API must implement these endpoints:GET /api/v1/connections/
Returns full connection details for a specific ID:Copy
{
"connection_id": "prod-analytics-db",
"connection_type": "postgres",
"auth_type": "username_password",
"name": "Production Analytics Database",
"environment": "production",
"host": "analytics-db.company.com",
"port": 5432,
"database": "analytics",
"username": "readonly_user",
"password": "encrypted_password_here",
"ssl_mode": "require",
"created_at": "2024-01-15T10:30:00Z",
"expires_at": "2024-02-15T10:30:00Z",
"status": "active",
"allowed_operations": ["read"],
"ip_whitelist": ["10.0.0.0/8", "192.168.1.0/24"]
}
POST /api/v1/connections//test
Tests if a connection is valid and accessible: Response:Copy
{
"status": "success",
"response_time_ms": 45,
"message": "Connection successful"
}
POST /api/v1/connections//refresh
Refreshes expired credentials for a connection: Response:Copy
{
"connection_id": "prod-analytics-db",
"connection_type": "postgres",
"password": "new_encrypted_password",
"expires_at": "2024-03-15T10:30:00Z",
"refreshed_at": "2024-02-15T10:30:00Z"
}
GET /api/v1/connections?environment=
Lists all available connections, optionally filtered by environment: Response:Copy
{
"connections": [
{
"connection_id": "prod-analytics-db",
"name": "Production Analytics Database",
"connection_type": "postgres",
"environment": "production",
"status": "active"
},
{
"connection_id": "dev-mysql-db",
"name": "Development MySQL",
"connection_type": "mysql",
"environment": "development",
"status": "active"
}
]
}
Connection Schemas by Database Type
The system supports multiple database types with their specific connection schemas:- PostgreSQL
- MySQL
- AWS Athena
- BigQuery
- Snowflake
Copy
{
"connection_id": "prod-postgres",
"connection_type": "postgres",
"auth_type": "username_password",
"host": "postgres.company.com",
"port": 5432,
"database": "analytics",
"username": "user",
"password": "encrypted_password",
"sslmode": "require",
"application_name": "ajna-analytics",
"schema_name": "public",
"connection_timeout": 30,
"query_timeout": 300,
"max_connections": 10
}
Copy
{
"connection_id": "prod-mysql",
"connection_type": "mysql",
"auth_type": "username_password",
"host": "mysql.company.com",
"port": 3306,
"database": "analytics",
"username": "user",
"password": "encrypted_password",
"charset": "utf8mb4",
"ssl_mode": "PREFERRED",
"autocommit": true
}
Copy
{
"connection_id": "prod-athena",
"connection_type": "athena",
"auth_type": "iam_role",
"region": "us-east-1",
"role_arn": "arn:aws:iam::123456789012:role/AthenaAccessRole",
"external_id": "unique-external-id",
"s3_staging_dir": "s3://your-bucket/athena-results/",
"workgroup": "analytics",
"database": "default",
"catalog": "AwsDataCatalog"
}
Copy
{
"connection_id": "prod-bigquery",
"connection_type": "bigquery",
"auth_type": "service_account",
"project_id": "your-gcp-project",
"service_account_email": "[email protected]",
"service_account_key": "encrypted_key_json",
"dataset_id": "analytics",
"location": "US",
"maximum_bytes_billed": 1000000000
}
Copy
{
"connection_id": "prod-snowflake",
"connection_type": "snowflake",
"auth_type": "username_password",
"account": "your_account.snowflakecomputing.com",
"user": "analytics_user",
"password": "encrypted_password",
"warehouse": "ANALYTICS_WH",
"database": "ANALYTICS_DB",
"schema": "PUBLIC",
"role": "ANALYTICS_ROLE",
"client_session_keep_alive": true
}
Authentication Methods Supported
- username_password: Traditional database authentication
- connection_string: Full connection URI with embedded credentials
- iam_role: AWS IAM role-based authentication
- access_key: AWS access key and secret key
- service_account: GCP service account authentication
- managed_identity: Azure managed identity
- key_pair: SSH key-pair authentication (Snowflake)
- oauth: OAuth token-based authentication
- jwt: JWT token authentication
- api_key: API key authentication
Connection Manager Implementation
Here’s how to implement the connection management system:Copy
from ajna_analytical_engine.config.connection_manager import ConnectionManager
from ajna_analytical_engine.config.connection_contracts import RESTConnectionProvider
# Option 1: Create with REST provider
connection_manager = ConnectionManager.create_with_rest_provider(
api_endpoint="https://your-connection-api.company.com",
api_token="your-api-token",
cache_ttl=300, # 5 minutes cache
timeout=30
)
# Option 2: Create programmatically
provider = RESTConnectionProvider(
endpoint="https://your-connection-api.company.com",
auth_token="your-api-token",
timeout=30,
retry_attempts=3
)
connection_manager = ConnectionManager(
connection_provider=provider,
cache_ttl=300,
enable_cache=True
)
# Use with analytics engine
engine = AnalyticsEngine(
config={
"connection_manager": connection_manager
}
)
Connection Lifecycle Operations
Copy
# Test a connection
is_valid = await connection_manager.test_connection(
{"connection_id": "prod-analytics-db"},
source_name="Analytics DB"
)
print(f"Connection valid: {is_valid}")
# List available connections
connections = await connection_manager.list_connections(
environment="production"
)
print(f"Available connections: {connections}")
# Refresh expired credentials
success = await connection_manager.refresh_connection("prod-analytics-db")
print(f"Refresh successful: {success}")
# Get cache statistics
stats = connection_manager.get_cache_stats()
print(f"Cache hits: {stats['hits']}, misses: {stats['misses']}")
# Clear cache if needed
await connection_manager.clear_cache()
Configuration with Connection IDs
In your datasource configuration, use connection IDs instead of credentials:Copy
# datasource.yaml
datasources:
# Traditional credentials (not recommended for production)
local_postgres:
type: postgres
host: localhost
port: 5432
database: test_db
username: test_user
password: test_password
# Connection ID approach (recommended for enterprise)
prod_analytics:
connection_id: "prod-analytics-db"
staging_warehouse:
connection_id: "staging-warehouse-db"
reporting_mysql:
connection_id: "reporting-mysql-01"
# Engine configuration with connection API
engine:
backend_api:
endpoint: "https://connections.company.com"
token: "${CONNECTION_API_TOKEN}"
cache_ttl: 300
timeout: 30
retry_attempts: 3
API Authentication
The REST API expects a Bearer token in the Authorization header:Copy
curl -H "Authorization: Bearer your-api-token" \
-H "Content-Type: application/json" \
"https://your-connection-api.company.com/api/v1/connections/prod-analytics-db"
Connection Flow Diagram
Here’s how the connection resolution works:- Application requests connection: Engine needs to connect to
prod-analytics-db - Cache check: Connection manager checks local cache first
- API call: If not cached, calls REST API to resolve connection ID
- Connection creation: Converts response to database connection string
- Caching: Stores connection details in cache for future use
- Database connection: Uses resolved credentials to connect to database
Security Considerations
- Credential encryption: All passwords and keys should be encrypted at rest
- Token rotation: API tokens should be rotated regularly
- Access controls: Implement proper RBAC for connection access
- Audit logging: Log all connection access and credential retrievals
- Network security: Use HTTPS for all API communications
- IP whitelisting: Restrict connections to known IP ranges
- Credential expiration: Implement automatic credential rotation
- Monitoring: Alert on failed connections or credential access
Benefits of Connection ID System
- Security: Credentials never appear in application code
- Flexibility: Switch between environments without code changes
- Auditability: Track who accessed which databases when
- Scalability: Centrally manage thousands of connections
- Compliance: Meet enterprise security requirements
- Reliability: Automatic credential refresh and failover
- Team isolation: Different teams can use different connection IDs
- Environment promotion: Same code works across dev/staging/prod
Complete Example: AWS Athena with Connection ID
Here’s a complete example showing how to use AWS Athena with Connection ID and access key authentication:1. Connection API Response for Athena
When your connection API receives a request forGET /api/v1/connections/prod-athena-warehouse, it should return:
Copy
{
"connection_id": "prod-athena-warehouse",
"connection_type": "athena",
"auth_type": "access_key",
"name": "Production Athena Data Warehouse",
"description": "Main data warehouse for production analytics",
"environment": "production",
// AWS Configuration
"region": "us-east-1",
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"session_token": null, // Optional for temporary credentials
// Athena-specific settings
"workgroup": "analytics-production",
"s3_staging_dir": "s3://company-athena-results/production/",
"database": "analytics_db",
"catalog": "AwsDataCatalog",
// Security and lifecycle
"created_at": "2024-01-15T10:30:00Z",
"expires_at": "2024-04-15T10:30:00Z", // Access keys expire quarterly
"last_used": "2024-02-15T14:22:33Z",
"status": "active",
"allowed_operations": ["read", "write"],
"ip_whitelist": ["10.0.0.0/8", "203.0.113.0/24"]
}
2. Application Configuration
Configure your application to use the Connection ID:Copy
# datasources.yaml
datasources:
# Use Connection ID instead of direct credentials
athena_warehouse:
connection_id: "prod-athena-warehouse"
# You can also specify additional table-level settings
sales_data:
connection_id: "prod-athena-warehouse"
table: "sales_fact"
schema: "analytics"
# engine.yaml
engine:
# Configure connection API
backend_api:
endpoint: "https://connections.company.com"
token: "${CONNECTION_API_TOKEN}"
cache_ttl: 300 # Cache for 5 minutes
timeout: 30
retry_attempts: 3
3. Python Implementation
Copy
from ajna_analytical_engine import AnalyticsEngine
from ajna_analytical_engine.types.query import QueryRequest
import os
# Initialize engine with connection API
engine = AnalyticsEngine(
config={
"backend_api": {
"endpoint": "https://connections.company.com",
"token": os.getenv("CONNECTION_API_TOKEN"),
"cache_ttl": 300,
"timeout": 30
}
}
)
# Query using Connection ID - the engine will automatically resolve credentials
athena_query = QueryRequest(
sources=[
{
"connection_id": "prod-athena-warehouse", # Resolved via API
"table": "sales_fact",
"alias": "sales"
},
{
"connection_id": "prod-athena-warehouse",
"table": "customer_dim",
"alias": "customers"
}
],
select=[
"sales.order_date",
"customers.customer_name",
"sales.amount",
"sales.region"
],
joins=[
{
"type": "inner",
"left_table": "sales",
"right_table": "customers",
"left_column": "customer_id",
"right_column": "customer_id"
}
],
filters={
"sales": [
{
"column": "order_date",
"op": ">=",
"value": "2024-01-01"
}
]
},
order_by=[
{"column": "order_date", "direction": "desc"}
],
limit=1000
)
# Execute query - credentials are fetched automatically
result = await engine.execute_query(athena_query)
print(f"Query returned {result.metadata.row_count} rows")
# The engine converts the connection details to this Athena URI internally:
# awsathena+rest://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%[email protected]:443/analytics_db?s3_staging_dir=s3%3A//company-athena-results/production/&workgroup=analytics-production
4. SQL Query Example with Connection ID
Copy
from ajna_analytical_engine.types.query import SQLToQueryRequest
# Use SQL with Connection ID resolution
sql_query = SQLToQueryRequest(
sql="""
WITH monthly_sales AS (
SELECT
DATE_TRUNC('month', order_date) as month,
region,
SUM(amount) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(amount) as avg_order_value
FROM sales_fact
WHERE order_date >= DATE '2024-01-01'
GROUP BY DATE_TRUNC('month', order_date), region
),
regional_growth AS (
SELECT
month,
region,
total_revenue,
unique_customers,
avg_order_value,
LAG(total_revenue) OVER (
PARTITION BY region
ORDER BY month
) as prev_month_revenue,
PERCENT_RANK() OVER (
PARTITION BY month
ORDER BY total_revenue DESC
) as revenue_rank
FROM monthly_sales
)
SELECT
month,
region,
total_revenue,
unique_customers,
avg_order_value,
CASE
WHEN prev_month_revenue IS NULL THEN 'New'
WHEN total_revenue > prev_month_revenue THEN 'Growing'
WHEN total_revenue < prev_month_revenue THEN 'Declining'
ELSE 'Stable'
END as trend,
ROUND(revenue_rank * 100, 1) as percentile_rank
FROM regional_growth
ORDER BY month DESC, total_revenue DESC
""",
sql_name="athena_regional_growth_analysis",
# Connection ID is resolved from datasource config
default_connection_id="prod-athena-warehouse"
)
# Convert SQL to QueryRequest and execute
query_request = engine.convert_sql_to_query(sql_query)
result = await engine.execute_query(QueryRequest(**query_request))
print(f"Regional growth analysis: {result.metadata.row_count} rows")
for row in result.data.to_dicts()[:5]: # Show first 5 rows
print(f"{row['month']} - {row['region']}: ${row['total_revenue']:,.2f} ({row['trend']})")
5. Connection Management Operations
Copy
from ajna_analytical_engine.config.connection_manager import ConnectionManager
# Create connection manager
connection_manager = ConnectionManager.create_with_rest_provider(
api_endpoint="https://connections.company.com",
api_token=os.getenv("CONNECTION_API_TOKEN"),
cache_ttl=300
)
# Test the Athena connection
is_valid = await connection_manager.test_connection(
{"connection_id": "prod-athena-warehouse"},
source_name="Athena Warehouse"
)
print(f"Athena connection valid: {is_valid}")
# List all production connections
prod_connections = await connection_manager.list_connections(
environment="production"
)
print(f"Production connections: {prod_connections}")
# Get connection details (for debugging)
try:
athena_connection = await connection_manager.connection_provider.get_connection(
"prod-athena-warehouse"
)
print(f"Connection type: {athena_connection.connection_type}")
print(f"Region: {athena_connection.region}")
print(f"Workgroup: {athena_connection.workgroup}")
print(f"S3 staging: {athena_connection.s3_staging_dir}")
except Exception as e:
print(f"Error fetching connection: {e}")
# Refresh credentials if they're expiring
success = await connection_manager.refresh_connection("prod-athena-warehouse")
print(f"Credential refresh successful: {success}")
# Cache statistics
stats = connection_manager.get_cache_stats()
print(f"Cache performance - Hits: {stats.get('hits', 0)}, Misses: {stats.get('misses', 0)}")
6. Environment-Specific Configuration
Copy
# Different environments use different connection IDs
import os
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
# Map environments to connection IDs
ATHENA_CONNECTIONS = {
"development": "dev-athena-sandbox",
"staging": "staging-athena-warehouse",
"production": "prod-athena-warehouse"
}
# Use environment-specific connection
athena_connection_id = ATHENA_CONNECTIONS[ENVIRONMENT]
query = QueryRequest(
sources=[
{
"connection_id": athena_connection_id,
"table": "sales_fact",
"alias": "sales"
}
],
select=["order_date", "amount", "region"],
limit=100
)
result = await engine.execute_query(query)
print(f"Environment: {ENVIRONMENT}")
print(f"Connection ID: {athena_connection_id}")
print(f"Results: {result.metadata.row_count} rows")
7. Error Handling and Monitoring
Copy
from ajna_analytical_engine.exceptions import ConnectionError, AuthenticationError
async def execute_athena_query_with_retry(query_request, max_retries=3):
"""Execute Athena query with automatic retry and credential refresh."""
for attempt in range(max_retries):
try:
result = await engine.execute_query(query_request)
return result
except AuthenticationError as e:
print(f"Authentication failed (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
# Try refreshing credentials
success = await connection_manager.refresh_connection(
"prod-athena-warehouse"
)
if success:
print("Credentials refreshed, retrying...")
continue
raise e
except ConnectionError as e:
print(f"Connection failed (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
print("Retrying connection...")
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise e
# Usage
try:
result = await execute_athena_query_with_retry(athena_query)
print(f"Query successful: {result.metadata.row_count} rows")
except Exception as e:
print(f"Query failed after retries: {e}")
