Examples
Copy-paste examples for common analytical use cases.Basic Operations
Simple Data Query
Copy
from ajna_analytical_engine import AnalyticalEngine, QueryRequest
engine = AnalyticalEngine()
# Query CSV file
request = QueryRequest(
sources=["customers.csv"],
select=["name", "email", "signup_date"],
filters={"customers.csv": [
{"column": "active", "op": "=", "value": True}
]},
order_by=[{"column": "signup_date", "direction": "desc"}],
limit=10
)
result = engine.execute_query(request)
print(f"Found {result.metadata.row_count} active customers")
Sales Analysis
Copy
# Monthly sales summary
request = QueryRequest(
sources=["sales.parquet"],
select=["DATE_TRUNC('month', date) as month"],
aggregations=[
{"function": "sum", "column": "revenue", "alias": "total_revenue"},
{"function": "count", "column": "*", "alias": "order_count"},
{"function": "avg", "column": "revenue", "alias": "avg_order_value"}
],
group_by=["month"],
order_by=[{"column": "month", "direction": "desc"}],
limit=12
)
result = engine.execute_query(request)
for row in result.data:
print(f"{row['month']}: ${row['total_revenue']:,.2f} ({row['order_count']} orders)")
Advanced Analytics
Customer Segmentation (RFM Analysis)
Copy
# RFM Analysis: Recency, Frequency, Monetary
request = QueryRequest(
sources=["transactions.parquet"],
select=["customer_id"],
aggregations=[
{"function": "max", "column": "date", "alias": "last_purchase"},
{"function": "count", "column": "*", "alias": "frequency"},
{"function": "sum", "column": "amount", "alias": "monetary_value"}
],
window_functions=[
{
"function": "ntile",
"column": "last_purchase",
"order_by": [{"column": "last_purchase", "direction": "desc"}],
"buckets": 5,
"alias": "recency_score"
},
{
"function": "ntile",
"column": "frequency",
"order_by": [{"column": "frequency", "direction": "desc"}],
"buckets": 5,
"alias": "frequency_score"
},
{
"function": "ntile",
"column": "monetary_value",
"order_by": [{"column": "monetary_value", "direction": "desc"}],
"buckets": 5,
"alias": "monetary_score"
}
],
group_by=["customer_id"]
)
result = engine.execute_query(request)
print("Customer RFM Segmentation:")
print(result.data[:10])
Sales Performance Dashboard
Copy
# Comprehensive sales dashboard data
request = QueryRequest(
sources=["sales.parquet", "products.csv"],
select=[
"products.category",
"products.name as product_name"
],
joins=[{
"left": "sales.product_id",
"right": "products.id",
"type": "inner"
}],
aggregations=[
{"function": "sum", "column": "sales.revenue", "alias": "total_revenue"},
{"function": "count", "column": "*", "alias": "units_sold"},
{"function": "avg", "column": "sales.revenue", "alias": "avg_sale_price"}
],
window_functions=[
{
"function": "rank",
"partition_by": ["products.category"],
"order_by": [{"column": "total_revenue", "direction": "desc"}],
"alias": "category_rank"
},
{
"function": "sum",
"column": "total_revenue",
"partition_by": ["products.category"],
"alias": "category_total"
}
],
group_by=["products.category", "products.name"],
having=[
{"column": "total_revenue", "op": ">", "value": 1000}
],
order_by=[{"column": "total_revenue", "direction": "desc"}]
)
result = engine.execute_query(request)
print("Sales Performance by Product:")
for row in result.data[:20]:
print(f"{row['product_name']}: ${row['total_revenue']:,.2f} (Rank #{row['category_rank']} in {row['category']})")
Time Series Analysis
Moving Averages and Trends
Copy
# Daily sales with moving averages
request = QueryRequest(
sources=["daily_sales.parquet"],
select=["date", "daily_revenue"],
window_functions=[
{
"function": "avg",
"column": "daily_revenue",
"order_by": [{"column": "date", "direction": "asc"}],
"window_frame": {"rows": [-6, 0]}, # 7-day moving average
"alias": "revenue_7day_avg"
},
{
"function": "avg",
"column": "daily_revenue",
"order_by": [{"column": "date", "direction": "asc"}],
"window_frame": {"rows": [-29, 0]}, # 30-day moving average
"alias": "revenue_30day_avg"
},
{
"function": "lag",
"column": "daily_revenue",
"order_by": [{"column": "date", "direction": "asc"}],
"offset": 7,
"alias": "revenue_week_ago"
}
],
order_by=[{"column": "date", "direction": "asc"}]
)
result = engine.execute_query(request)
print("Daily Revenue with Moving Averages:")
for row in result.data[-30:]: # Last 30 days
week_change = ((row['daily_revenue'] - row['revenue_week_ago']) / row['revenue_week_ago'] * 100) if row['revenue_week_ago'] else 0
print(f"{row['date']}: ${row['daily_revenue']:,.2f} (7-day avg: ${row['revenue_7day_avg']:,.2f}, WoW: {week_change:+.1f}%)")
Seasonal Analysis
Copy
# Year-over-year comparison by month
request = QueryRequest(
sources=["sales.parquet"],
select=[
"EXTRACT(YEAR FROM date) as year",
"EXTRACT(MONTH FROM date) as month"
],
aggregations=[
{"function": "sum", "column": "revenue", "alias": "monthly_revenue"}
],
window_functions=[
{
"function": "lag",
"column": "monthly_revenue",
"partition_by": ["month"],
"order_by": [{"column": "year", "direction": "asc"}],
"offset": 1,
"alias": "prev_year_revenue"
}
],
group_by=["year", "month"],
order_by=[
{"column": "year", "direction": "desc"},
{"column": "month", "direction": "asc"}
]
)
result = engine.execute_query(request)
print("Year-over-Year Monthly Comparison:")
for row in result.data:
yoy_growth = ((row['monthly_revenue'] - row['prev_year_revenue']) / row['prev_year_revenue'] * 100) if row['prev_year_revenue'] else 0
print(f"{row['year']}-{row['month']:02d}: ${row['monthly_revenue']:,.2f} (YoY: {yoy_growth:+.1f}%)")
Complex Joins and Subqueries
Customer Lifetime Value
Copy
# Calculate customer lifetime value with complex subqueries
request = QueryRequest(
sources=["customers.csv"],
select=[
"id as customer_id",
"name",
"signup_date"
],
aggregations=[
# Subquery to get total revenue per customer
{
"function": "sum",
"subquery": {
"sources": ["orders.parquet"],
"select": ["total"],
"filters": {"orders.parquet": [
{"column": "customer_id", "op": "=", "column_ref": "customers.id"}
]}
},
"alias": "total_revenue"
},
# Subquery to get order count
{
"function": "count",
"subquery": {
"sources": ["orders.parquet"],
"select": ["id"],
"filters": {"orders.parquet": [
{"column": "customer_id", "op": "=", "column_ref": "customers.id"}
]}
},
"alias": "order_count"
}
],
# Only customers with orders
filters={"customers.csv": [
{
"column": "id",
"op": "in",
"subquery": {
"sources": ["orders.parquet"],
"select": ["customer_id"],
"distinct": True
}
}
]},
order_by=[{"column": "total_revenue", "direction": "desc"}]
)
result = engine.execute_query(request)
print("Customer Lifetime Value Analysis:")
for row in result.data[:20]:
avg_order = row['total_revenue'] / row['order_count'] if row['order_count'] > 0 else 0
print(f"{row['name']}: ${row['total_revenue']:,.2f} ({row['order_count']} orders, ${avg_order:.2f} avg)")
Inventory Analysis
Copy
# Low stock alert with supplier information
request = QueryRequest(
sources=["inventory.parquet", "products.csv", "suppliers.csv"],
select=[
"products.name as product_name",
"products.category",
"suppliers.name as supplier_name",
"suppliers.contact_email",
"inventory.current_stock",
"inventory.reorder_level"
],
joins=[
{
"left": "inventory.product_id",
"right": "products.id",
"type": "inner"
},
{
"left": "products.supplier_id",
"right": "suppliers.id",
"type": "left"
}
],
filters={"inventory.parquet": [
{"column": "current_stock", "op": "<=", "column_ref": "reorder_level"}
]},
order_by=[
{"column": "products.category", "direction": "asc"},
{"column": "current_stock", "direction": "asc"}
]
)
result = engine.execute_query(request)
print("Low Stock Alert:")
for row in result.data:
print(f"⚠️ {row['product_name']} ({row['category']})")
print(f" Stock: {row['current_stock']} (Reorder at: {row['reorder_level']})")
print(f" Supplier: {row['supplier_name']} - {row['contact_email']}")
print()
SQL Translation Examples
Complex Analytics with CTEs
Copy
from ajna_analytical_engine import SQLToQueryRequest
# Complex analytical query with Common Table Expressions
sql_request = SQLToQueryRequest(
sql="""
WITH monthly_metrics AS (
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as order_count,
SUM(total_amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE order_date >= '2024-01-01'
GROUP BY DATE_TRUNC('month', order_date)
),
growth_rates AS (
SELECT
month,
order_count,
revenue,
unique_customers,
LAG(revenue) OVER (ORDER BY month) as prev_month_revenue,
(revenue - LAG(revenue) OVER (ORDER BY month)) / LAG(revenue) OVER (ORDER BY month) * 100 as revenue_growth_rate
FROM monthly_metrics
)
SELECT
month,
order_count,
revenue,
unique_customers,
ROUND(revenue_growth_rate, 2) as revenue_growth_pct,
ROUND(revenue / unique_customers, 2) as avg_revenue_per_customer
FROM growth_rates
ORDER BY month DESC
""",
sql_name="monthly_growth_analysis"
)
# Convert and execute
query_json = engine.convert_sql_to_query(sql_request)
result = engine.execute_query_from_json(query_json)
print("Monthly Growth Analysis:")
for row in result.data:
print(f"{row['month']}: ${row['revenue']:,.2f} ({row['revenue_growth_pct']:+.1f}% growth)")
Cohort Analysis
Copy
# Customer cohort analysis
sql_request = SQLToQueryRequest(
sql="""
WITH customer_cohorts AS (
SELECT
customer_id,
DATE_TRUNC('month', MIN(order_date)) as cohort_month,
DATE_TRUNC('month', order_date) as order_month
FROM orders
GROUP BY customer_id, DATE_TRUNC('month', order_date)
),
cohort_sizes AS (
SELECT
cohort_month,
COUNT(DISTINCT customer_id) as cohort_size
FROM customer_cohorts
GROUP BY cohort_month
),
cohort_retention AS (
SELECT
c.cohort_month,
c.order_month,
EXTRACT(MONTH FROM AGE(c.order_month, c.cohort_month)) as month_diff,
COUNT(DISTINCT c.customer_id) as active_customers
FROM customer_cohorts c
GROUP BY c.cohort_month, c.order_month
)
SELECT
r.cohort_month,
r.month_diff,
r.active_customers,
s.cohort_size,
ROUND(r.active_customers::float / s.cohort_size * 100, 2) as retention_rate
FROM cohort_retention r
JOIN cohort_sizes s ON r.cohort_month = s.cohort_month
WHERE r.month_diff <= 12
ORDER BY r.cohort_month, r.month_diff
""",
sql_name="cohort_analysis"
)
query_json = engine.convert_sql_to_query(sql_request)
result = engine.execute_query_from_json(query_json)
print("Customer Cohort Retention Analysis:")
current_cohort = None
for row in result.data:
if row['cohort_month'] != current_cohort:
current_cohort = row['cohort_month']
print(f"\nCohort {current_cohort} (Size: {row['cohort_size']} customers):")
print(f" Month {row['month_diff']}: {row['retention_rate']:.1f}% retention ({row['active_customers']} customers)")
Performance Optimization Examples
Efficient Large Dataset Processing
Copy
from ajna_analytical_engine.config import EngineConfig
# Configure for large dataset processing
config = EngineConfig(
cache_enabled=True,
cache_size_mb=2048,
max_parallel_queries=8,
polars_thread_pool_size=8,
streaming_chunk_size=100000,
memory_limit_mb=4096,
enable_predicate_pushdown=True,
enable_projection_pushdown=True
)
engine = AnalyticalEngine(config=config)
# Process large dataset efficiently
request = QueryRequest(
sources=["large_transactions.parquet"],
select=["customer_id", "product_category", "amount"],
filters={"large_transactions.parquet": [
{"column": "date", "op": ">=", "value": "2024-01-01"},
{"column": "amount", "op": ">", "value": 100}
]},
aggregations=[
{"function": "sum", "column": "amount", "alias": "total_spent"},
{"function": "count", "column": "*", "alias": "transaction_count"}
],
group_by=["customer_id", "product_category"],
order_by=[{"column": "total_spent", "direction": "desc"}],
limit=1000
)
result = engine.execute_query(request)
print(f"Processed {result.metadata.rows_processed:,} rows in {result.metadata.execution_time_ms}ms")
print(f"Memory usage: {result.metadata.memory_usage_mb:.1f}MB")
print(f"Cache hit: {result.metadata.cache_hit}")
Batch Processing Multiple Queries
Copy
# Process multiple related queries efficiently
queries = [
# Daily sales summary
QueryRequest(
sources=["sales.parquet"],
select=["DATE(date) as sale_date"],
aggregations=[{"function": "sum", "column": "amount", "alias": "daily_total"}],
group_by=["sale_date"],
order_by=[{"column": "sale_date", "direction": "desc"}],
limit=30
),
# Top products
QueryRequest(
sources=["sales.parquet", "products.csv"],
select=["products.name"],
joins=[{"left": "sales.product_id", "right": "products.id", "type": "inner"}],
aggregations=[{"function": "sum", "column": "sales.amount", "alias": "total_sales"}],
group_by=["products.name"],
order_by=[{"column": "total_sales", "direction": "desc"}],
limit=10
),
# Customer segments
QueryRequest(
sources=["customers.csv"],
select=["segment"],
aggregations=[{"function": "count", "column": "*", "alias": "customer_count"}],
group_by=["segment"],
order_by=[{"column": "customer_count", "direction": "desc"}]
)
]
# Execute all queries in parallel
results = engine.execute_batch(queries, max_parallel=3)
print("Daily Sales (Last 30 days):")
for row in results[0].data:
print(f" {row['sale_date']}: ${row['daily_total']:,.2f}")
print("\nTop Products:")
for row in results[1].data:
print(f" {row['name']}: ${row['total_sales']:,.2f}")
print("\nCustomer Segments:")
for row in results[2].data:
print(f" {row['segment']}: {row['customer_count']} customers")
Error Handling Examples
Robust Query Execution
Copy
from ajna_analytical_engine.exceptions import (
QueryValidationError,
DataLoadingError,
QueryExecutionError
)
def safe_query_execution(engine, request):
try:
result = engine.execute_query(request)
return result
except QueryValidationError as e:
print(f"❌ Query validation failed: {e.message}")
if e.field_errors:
for field, error in e.field_errors.items():
print(f" - {field}: {error}")
return None
except DataLoadingError as e:
print(f"❌ Data loading failed: {e.message}")
print(f" Source: {e.source}")
if e.details:
print(f" Details: {e.details}")
return None
except QueryExecutionError as e:
print(f"❌ Query execution failed: {e.message}")
print(f" Stage: {e.execution_stage}")
if e.query_context:
print(f" Context: {e.query_context}")
return None
except Exception as e:
print(f"❌ Unexpected error: {str(e)}")
return None
# Use the safe wrapper
request = QueryRequest(
sources=["sales.parquet"],
select=["customer_id", "revenue"],
limit=100
)
result = safe_query_execution(engine, request)
if result:
print(f"✅ Query successful: {result.metadata.row_count} rows returned")
else:
print("❌ Query failed - check logs above")
