Skip to main content

Examples

Copy-paste examples for common analytical use cases.

Basic Operations

Simple Data Query

from ajna_analytical_engine import AnalyticalEngine, QueryRequest

engine = AnalyticalEngine()

# Query CSV file
request = QueryRequest(
    sources=["customers.csv"],
    select=["name", "email", "signup_date"],
    filters={"customers.csv": [
        {"column": "active", "op": "=", "value": True}
    ]},
    order_by=[{"column": "signup_date", "direction": "desc"}],
    limit=10
)

result = engine.execute_query(request)
print(f"Found {result.metadata.row_count} active customers")

Sales Analysis

# Monthly sales summary
request = QueryRequest(
    sources=["sales.parquet"],
    select=["DATE_TRUNC('month', date) as month"],
    aggregations=[
        {"function": "sum", "column": "revenue", "alias": "total_revenue"},
        {"function": "count", "column": "*", "alias": "order_count"},
        {"function": "avg", "column": "revenue", "alias": "avg_order_value"}
    ],
    group_by=["month"],
    order_by=[{"column": "month", "direction": "desc"}],
    limit=12
)

result = engine.execute_query(request)
for row in result.data:
    print(f"{row['month']}: ${row['total_revenue']:,.2f} ({row['order_count']} orders)")

Advanced Analytics

Customer Segmentation (RFM Analysis)

# RFM Analysis: Recency, Frequency, Monetary
request = QueryRequest(
    sources=["transactions.parquet"],
    select=["customer_id"],
    aggregations=[
        {"function": "max", "column": "date", "alias": "last_purchase"},
        {"function": "count", "column": "*", "alias": "frequency"},
        {"function": "sum", "column": "amount", "alias": "monetary_value"}
    ],
    window_functions=[
        {
            "function": "ntile",
            "column": "last_purchase",
            "order_by": [{"column": "last_purchase", "direction": "desc"}],
            "buckets": 5,
            "alias": "recency_score"
        },
        {
            "function": "ntile", 
            "column": "frequency",
            "order_by": [{"column": "frequency", "direction": "desc"}],
            "buckets": 5,
            "alias": "frequency_score"
        },
        {
            "function": "ntile",
            "column": "monetary_value", 
            "order_by": [{"column": "monetary_value", "direction": "desc"}],
            "buckets": 5,
            "alias": "monetary_score"
        }
    ],
    group_by=["customer_id"]
)

result = engine.execute_query(request)
print("Customer RFM Segmentation:")
print(result.data[:10])

Sales Performance Dashboard

# Comprehensive sales dashboard data
request = QueryRequest(
    sources=["sales.parquet", "products.csv"],
    select=[
        "products.category",
        "products.name as product_name"
    ],
    joins=[{
        "left": "sales.product_id",
        "right": "products.id",
        "type": "inner"
    }],
    aggregations=[
        {"function": "sum", "column": "sales.revenue", "alias": "total_revenue"},
        {"function": "count", "column": "*", "alias": "units_sold"},
        {"function": "avg", "column": "sales.revenue", "alias": "avg_sale_price"}
    ],
    window_functions=[
        {
            "function": "rank",
            "partition_by": ["products.category"],
            "order_by": [{"column": "total_revenue", "direction": "desc"}],
            "alias": "category_rank"
        },
        {
            "function": "sum",
            "column": "total_revenue",
            "partition_by": ["products.category"],
            "alias": "category_total"
        }
    ],
    group_by=["products.category", "products.name"],
    having=[
        {"column": "total_revenue", "op": ">", "value": 1000}
    ],
    order_by=[{"column": "total_revenue", "direction": "desc"}]
)

result = engine.execute_query(request)
print("Sales Performance by Product:")
for row in result.data[:20]:
    print(f"{row['product_name']}: ${row['total_revenue']:,.2f} (Rank #{row['category_rank']} in {row['category']})")

Time Series Analysis

# Daily sales with moving averages
request = QueryRequest(
    sources=["daily_sales.parquet"],
    select=["date", "daily_revenue"],
    window_functions=[
        {
            "function": "avg",
            "column": "daily_revenue",
            "order_by": [{"column": "date", "direction": "asc"}],
            "window_frame": {"rows": [-6, 0]},  # 7-day moving average
            "alias": "revenue_7day_avg"
        },
        {
            "function": "avg", 
            "column": "daily_revenue",
            "order_by": [{"column": "date", "direction": "asc"}],
            "window_frame": {"rows": [-29, 0]},  # 30-day moving average
            "alias": "revenue_30day_avg"
        },
        {
            "function": "lag",
            "column": "daily_revenue",
            "order_by": [{"column": "date", "direction": "asc"}],
            "offset": 7,
            "alias": "revenue_week_ago"
        }
    ],
    order_by=[{"column": "date", "direction": "asc"}]
)

result = engine.execute_query(request)
print("Daily Revenue with Moving Averages:")
for row in result.data[-30:]:  # Last 30 days
    week_change = ((row['daily_revenue'] - row['revenue_week_ago']) / row['revenue_week_ago'] * 100) if row['revenue_week_ago'] else 0
    print(f"{row['date']}: ${row['daily_revenue']:,.2f} (7-day avg: ${row['revenue_7day_avg']:,.2f}, WoW: {week_change:+.1f}%)")

Seasonal Analysis

# Year-over-year comparison by month
request = QueryRequest(
    sources=["sales.parquet"],
    select=[
        "EXTRACT(YEAR FROM date) as year",
        "EXTRACT(MONTH FROM date) as month"
    ],
    aggregations=[
        {"function": "sum", "column": "revenue", "alias": "monthly_revenue"}
    ],
    window_functions=[
        {
            "function": "lag",
            "column": "monthly_revenue",
            "partition_by": ["month"],
            "order_by": [{"column": "year", "direction": "asc"}],
            "offset": 1,
            "alias": "prev_year_revenue"
        }
    ],
    group_by=["year", "month"],
    order_by=[
        {"column": "year", "direction": "desc"},
        {"column": "month", "direction": "asc"}
    ]
)

result = engine.execute_query(request)
print("Year-over-Year Monthly Comparison:")
for row in result.data:
    yoy_growth = ((row['monthly_revenue'] - row['prev_year_revenue']) / row['prev_year_revenue'] * 100) if row['prev_year_revenue'] else 0
    print(f"{row['year']}-{row['month']:02d}: ${row['monthly_revenue']:,.2f} (YoY: {yoy_growth:+.1f}%)")

Complex Joins and Subqueries

Customer Lifetime Value

# Calculate customer lifetime value with complex subqueries
request = QueryRequest(
    sources=["customers.csv"],
    select=[
        "id as customer_id",
        "name",
        "signup_date"
    ],
    aggregations=[
        # Subquery to get total revenue per customer
        {
            "function": "sum",
            "subquery": {
                "sources": ["orders.parquet"],
                "select": ["total"],
                "filters": {"orders.parquet": [
                    {"column": "customer_id", "op": "=", "column_ref": "customers.id"}
                ]}
            },
            "alias": "total_revenue"
        },
        # Subquery to get order count
        {
            "function": "count",
            "subquery": {
                "sources": ["orders.parquet"],
                "select": ["id"],
                "filters": {"orders.parquet": [
                    {"column": "customer_id", "op": "=", "column_ref": "customers.id"}
                ]}
            },
            "alias": "order_count"
        }
    ],
    # Only customers with orders
    filters={"customers.csv": [
        {
            "column": "id",
            "op": "in",
            "subquery": {
                "sources": ["orders.parquet"],
                "select": ["customer_id"],
                "distinct": True
            }
        }
    ]},
    order_by=[{"column": "total_revenue", "direction": "desc"}]
)

result = engine.execute_query(request)
print("Customer Lifetime Value Analysis:")
for row in result.data[:20]:
    avg_order = row['total_revenue'] / row['order_count'] if row['order_count'] > 0 else 0
    print(f"{row['name']}: ${row['total_revenue']:,.2f} ({row['order_count']} orders, ${avg_order:.2f} avg)")

Inventory Analysis

# Low stock alert with supplier information
request = QueryRequest(
    sources=["inventory.parquet", "products.csv", "suppliers.csv"],
    select=[
        "products.name as product_name",
        "products.category",
        "suppliers.name as supplier_name",
        "suppliers.contact_email",
        "inventory.current_stock",
        "inventory.reorder_level"
    ],
    joins=[
        {
            "left": "inventory.product_id",
            "right": "products.id",
            "type": "inner"
        },
        {
            "left": "products.supplier_id",
            "right": "suppliers.id",
            "type": "left"
        }
    ],
    filters={"inventory.parquet": [
        {"column": "current_stock", "op": "<=", "column_ref": "reorder_level"}
    ]},
    order_by=[
        {"column": "products.category", "direction": "asc"},
        {"column": "current_stock", "direction": "asc"}
    ]
)

result = engine.execute_query(request)
print("Low Stock Alert:")
for row in result.data:
    print(f"⚠️  {row['product_name']} ({row['category']})")
    print(f"   Stock: {row['current_stock']} (Reorder at: {row['reorder_level']})")
    print(f"   Supplier: {row['supplier_name']} - {row['contact_email']}")
    print()

SQL Translation Examples

Complex Analytics with CTEs

from ajna_analytical_engine import SQLToQueryRequest

# Complex analytical query with Common Table Expressions
sql_request = SQLToQueryRequest(
    sql="""
    WITH monthly_metrics AS (
        SELECT 
            DATE_TRUNC('month', order_date) as month,
            COUNT(*) as order_count,
            SUM(total_amount) as revenue,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM orders 
        WHERE order_date >= '2024-01-01'
        GROUP BY DATE_TRUNC('month', order_date)
    ),
    growth_rates AS (
        SELECT 
            month,
            order_count,
            revenue,
            unique_customers,
            LAG(revenue) OVER (ORDER BY month) as prev_month_revenue,
            (revenue - LAG(revenue) OVER (ORDER BY month)) / LAG(revenue) OVER (ORDER BY month) * 100 as revenue_growth_rate
        FROM monthly_metrics
    )
    SELECT 
        month,
        order_count,
        revenue,
        unique_customers,
        ROUND(revenue_growth_rate, 2) as revenue_growth_pct,
        ROUND(revenue / unique_customers, 2) as avg_revenue_per_customer
    FROM growth_rates
    ORDER BY month DESC
    """,
    sql_name="monthly_growth_analysis"
)

# Convert and execute
query_json = engine.convert_sql_to_query(sql_request)
result = engine.execute_query_from_json(query_json)

print("Monthly Growth Analysis:")
for row in result.data:
    print(f"{row['month']}: ${row['revenue']:,.2f} ({row['revenue_growth_pct']:+.1f}% growth)")

Cohort Analysis

# Customer cohort analysis
sql_request = SQLToQueryRequest(
    sql="""
    WITH customer_cohorts AS (
        SELECT 
            customer_id,
            DATE_TRUNC('month', MIN(order_date)) as cohort_month,
            DATE_TRUNC('month', order_date) as order_month
        FROM orders
        GROUP BY customer_id, DATE_TRUNC('month', order_date)
    ),
    cohort_sizes AS (
        SELECT 
            cohort_month,
            COUNT(DISTINCT customer_id) as cohort_size
        FROM customer_cohorts
        GROUP BY cohort_month
    ),
    cohort_retention AS (
        SELECT 
            c.cohort_month,
            c.order_month,
            EXTRACT(MONTH FROM AGE(c.order_month, c.cohort_month)) as month_diff,
            COUNT(DISTINCT c.customer_id) as active_customers
        FROM customer_cohorts c
        GROUP BY c.cohort_month, c.order_month
    )
    SELECT 
        r.cohort_month,
        r.month_diff,
        r.active_customers,
        s.cohort_size,
        ROUND(r.active_customers::float / s.cohort_size * 100, 2) as retention_rate
    FROM cohort_retention r
    JOIN cohort_sizes s ON r.cohort_month = s.cohort_month
    WHERE r.month_diff <= 12
    ORDER BY r.cohort_month, r.month_diff
    """,
    sql_name="cohort_analysis"
)

query_json = engine.convert_sql_to_query(sql_request)
result = engine.execute_query_from_json(query_json)

print("Customer Cohort Retention Analysis:")
current_cohort = None
for row in result.data:
    if row['cohort_month'] != current_cohort:
        current_cohort = row['cohort_month']
        print(f"\nCohort {current_cohort} (Size: {row['cohort_size']} customers):")
    
    print(f"  Month {row['month_diff']}: {row['retention_rate']:.1f}% retention ({row['active_customers']} customers)")

Performance Optimization Examples

Efficient Large Dataset Processing

from ajna_analytical_engine.config import EngineConfig

# Configure for large dataset processing
config = EngineConfig(
    cache_enabled=True,
    cache_size_mb=2048,
    max_parallel_queries=8,
    polars_thread_pool_size=8,
    streaming_chunk_size=100000,
    memory_limit_mb=4096,
    enable_predicate_pushdown=True,
    enable_projection_pushdown=True
)

engine = AnalyticalEngine(config=config)

# Process large dataset efficiently
request = QueryRequest(
    sources=["large_transactions.parquet"],
    select=["customer_id", "product_category", "amount"],
    filters={"large_transactions.parquet": [
        {"column": "date", "op": ">=", "value": "2024-01-01"},
        {"column": "amount", "op": ">", "value": 100}
    ]},
    aggregations=[
        {"function": "sum", "column": "amount", "alias": "total_spent"},
        {"function": "count", "column": "*", "alias": "transaction_count"}
    ],
    group_by=["customer_id", "product_category"],
    order_by=[{"column": "total_spent", "direction": "desc"}],
    limit=1000
)

result = engine.execute_query(request)
print(f"Processed {result.metadata.rows_processed:,} rows in {result.metadata.execution_time_ms}ms")
print(f"Memory usage: {result.metadata.memory_usage_mb:.1f}MB")
print(f"Cache hit: {result.metadata.cache_hit}")

Batch Processing Multiple Queries

# Process multiple related queries efficiently
queries = [
    # Daily sales summary
    QueryRequest(
        sources=["sales.parquet"],
        select=["DATE(date) as sale_date"],
        aggregations=[{"function": "sum", "column": "amount", "alias": "daily_total"}],
        group_by=["sale_date"],
        order_by=[{"column": "sale_date", "direction": "desc"}],
        limit=30
    ),
    
    # Top products
    QueryRequest(
        sources=["sales.parquet", "products.csv"],
        select=["products.name"],
        joins=[{"left": "sales.product_id", "right": "products.id", "type": "inner"}],
        aggregations=[{"function": "sum", "column": "sales.amount", "alias": "total_sales"}],
        group_by=["products.name"],
        order_by=[{"column": "total_sales", "direction": "desc"}],
        limit=10
    ),
    
    # Customer segments
    QueryRequest(
        sources=["customers.csv"],
        select=["segment"],
        aggregations=[{"function": "count", "column": "*", "alias": "customer_count"}],
        group_by=["segment"],
        order_by=[{"column": "customer_count", "direction": "desc"}]
    )
]

# Execute all queries in parallel
results = engine.execute_batch(queries, max_parallel=3)

print("Daily Sales (Last 30 days):")
for row in results[0].data:
    print(f"  {row['sale_date']}: ${row['daily_total']:,.2f}")

print("\nTop Products:")
for row in results[1].data:
    print(f"  {row['name']}: ${row['total_sales']:,.2f}")

print("\nCustomer Segments:")
for row in results[2].data:
    print(f"  {row['segment']}: {row['customer_count']} customers")

Error Handling Examples

Robust Query Execution

from ajna_analytical_engine.exceptions import (
    QueryValidationError,
    DataLoadingError, 
    QueryExecutionError
)

def safe_query_execution(engine, request):
    try:
        result = engine.execute_query(request)
        return result
    
    except QueryValidationError as e:
        print(f"❌ Query validation failed: {e.message}")
        if e.field_errors:
            for field, error in e.field_errors.items():
                print(f"  - {field}: {error}")
        return None
        
    except DataLoadingError as e:
        print(f"❌ Data loading failed: {e.message}")
        print(f"  Source: {e.source}")
        if e.details:
            print(f"  Details: {e.details}")
        return None
        
    except QueryExecutionError as e:
        print(f"❌ Query execution failed: {e.message}")
        print(f"  Stage: {e.execution_stage}")
        if e.query_context:
            print(f"  Context: {e.query_context}")
        return None
        
    except Exception as e:
        print(f"❌ Unexpected error: {str(e)}")
        return None

# Use the safe wrapper
request = QueryRequest(
    sources=["sales.parquet"],
    select=["customer_id", "revenue"],
    limit=100
)

result = safe_query_execution(engine, request)
if result:
    print(f"✅ Query successful: {result.metadata.row_count} rows returned")
else:
    print("❌ Query failed - check logs above")
These examples cover the most common use cases and patterns you’ll encounter when using Ajna Analytical Engine. Copy and modify them for your specific needs!