Loading...
# dags/daily_sales_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_postgres import S3ToPostgresOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
import great_expectations as gx
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'email': ['data-alerts@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
dag = DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Daily sales data pipeline with quality checks',
schedule='0 2 * * *', # 2 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
max_active_runs=1,
tags=['production', 'sales', 'daily'],
)
def extract_api_data(**context):
"""Extract data from sales API"""
import requests
import pandas as pd
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
execution_date = context['ds']
# Extract data from API
response = requests.get(
f'https://api.company.com/sales?date={execution_date}',
headers={'Authorization': f'Bearer {get_secret("SALES_API_TOKEN")}'},
timeout=300
)
response.raise_for_status()
# Convert to DataFrame
df = pd.DataFrame(response.json()['data'])
# Save to S3 (raw layer)
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_key = f'raw/sales/{execution_date}/sales.parquet'
df.to_parquet(
f's3://company-data-lake/{s3_key}',
engine='pyarrow',
compression='snappy',
index=False
)
# Push metadata to XCom
context['ti'].xcom_push(key='s3_key', value=s3_key)
context['ti'].xcom_push(key='row_count', value=len(df))
return s3_key
def validate_raw_data(**context):
"""Validate data quality using Great Expectations"""
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
s3_key = context['ti'].xcom_pull(key='s3_key', task_ids='extract_api_data')
# Initialize Great Expectations context
context_gx = gx.get_context()
# Define expectations
validator = context_gx.sources.add_or_update_pandas(
name="sales_data"
).read_parquet(f's3://company-data-lake/{s3_key}')
# Run validation suite
validator.expect_table_row_count_to_be_between(min_value=100, max_value=1000000)
validator.expect_column_values_to_not_be_null(column='sale_id')
validator.expect_column_values_to_be_unique(column='sale_id')
validator.expect_column_values_to_not_be_null(column='customer_id')
validator.expect_column_values_to_be_between(
column='amount',
min_value=0,
max_value=1000000
)
validator.expect_column_values_to_match_regex(
column='email',
regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
# Execute checkpoint
results = validator.validate()
if not results['success']:
raise ValueError(f"Data quality validation failed: {results['statistics']}")
return results['statistics']
def transform_to_bronze(**context):
"""Transform raw data to bronze layer (cleaned)"""
import pandas as pd
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
execution_date = context['ds']
s3_key = context['ti'].xcom_pull(key='s3_key', task_ids='extract_api_data')
# Read raw data
df = pd.read_parquet(f's3://company-data-lake/{s3_key}')
# Data cleaning transformations
df['sale_timestamp'] = pd.to_datetime(df['sale_timestamp'])
df['amount'] = df['amount'].astype(float)
df['email'] = df['email'].str.lower().str.strip()
df['processed_at'] = datetime.utcnow()
# Add metadata columns
df['_ingestion_date'] = execution_date
df['_source'] = 'sales_api'
# Write to bronze layer (partitioned by date)
bronze_key = f'bronze/sales/date={execution_date}/data.parquet'
df.to_parquet(
f's3://company-data-lake/{bronze_key}',
partition_cols=['_ingestion_date'],
engine='pyarrow',
compression='snappy'
)
return bronze_key
# Task: Extract from API
extract_task = PythonOperator(
task_id='extract_api_data',
python_callable=extract_api_data,
dag=dag,
)
# Task: Validate raw data
validate_task = PythonOperator(
task_id='validate_raw_data',
python_callable=validate_raw_data,
dag=dag,
)
# Task: Transform to bronze
bronze_task = PythonOperator(
task_id='transform_to_bronze',
python_callable=transform_to_bronze,
dag=dag,
)
# Task Group: Silver transformations with dbt
with TaskGroup('silver_transformations', dag=dag) as silver_group:
run_dbt_silver = DbtCloudRunJobOperator(
task_id='run_dbt_silver_models',
dbt_cloud_conn_id='dbt_cloud',
job_id=12345,
check_interval=30,
timeout=3600,
)
# Task Group: Gold aggregations
with TaskGroup('gold_aggregations', dag=dag) as gold_group:
daily_summary = PostgresOperator(
task_id='create_daily_summary',
postgres_conn_id='warehouse',
sql="""
INSERT INTO gold.daily_sales_summary
SELECT
DATE(sale_timestamp) as sale_date,
COUNT(DISTINCT sale_id) as total_sales,
COUNT(DISTINCT customer_id) as unique_customers,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
CURRENT_TIMESTAMP as created_at
FROM silver.sales
WHERE DATE(sale_timestamp) = '{{ ds }}'
GROUP BY DATE(sale_timestamp)
ON CONFLICT (sale_date) DO UPDATE
SET
total_sales = EXCLUDED.total_sales,
unique_customers = EXCLUDED.unique_customers,
total_revenue = EXCLUDED.total_revenue,
avg_order_value = EXCLUDED.avg_order_value,
created_at = EXCLUDED.created_at;
""",
)
product_summary = PostgresOperator(
task_id='create_product_summary',
postgres_conn_id='warehouse',
sql="sql/gold/product_daily_summary.sql",
params={'execution_date': '{{ ds }}'},
)
# Task: Data quality monitoring
monitor_quality = PythonOperator(
task_id='monitor_data_quality',
python_callable=lambda **ctx: print(f"Quality metrics: {ctx['ti'].xcom_pull(task_ids='validate_raw_data')}"),
dag=dag,
)
# Define dependencies
extract_task >> validate_task >> bronze_task >> silver_group >> gold_group >> monitor_quality-- models/silver/sales_enriched.sql
{{
config(
materialized='incremental',
unique_key='sale_id',
on_schema_change='sync_all_columns',
incremental_strategy='merge',
partition_by={
'field': 'sale_date',
'data_type': 'date',
'granularity': 'day'
},
cluster_by=['customer_id', 'product_id']
)
}}
WITH sales_raw AS (
SELECT
sale_id,
customer_id,
product_id,
amount,
sale_timestamp,
DATE(sale_timestamp) as sale_date,
_ingestion_date
FROM {{ source('bronze', 'sales') }}
{% if is_incremental() %}
WHERE _ingestion_date >= (SELECT MAX(sale_date) - INTERVAL '7 days' FROM {{ this }})
{% endif %}
),
customers AS (
SELECT
customer_id,
customer_name,
customer_segment,
customer_lifetime_value,
customer_join_date
FROM {{ ref('dim_customers') }}
),
products AS (
SELECT
product_id,
product_name,
product_category,
product_price,
product_cost
FROM {{ ref('dim_products') }}
)
SELECT
s.sale_id,
s.customer_id,
c.customer_name,
c.customer_segment,
s.product_id,
p.product_name,
p.product_category,
s.amount,
p.product_cost,
s.amount - p.product_cost AS profit,
s.sale_timestamp,
s.sale_date,
-- Customer metrics
c.customer_lifetime_value,
DATEDIFF('day', c.customer_join_date, s.sale_date) AS days_since_customer_join,
-- Time dimensions
EXTRACT(YEAR FROM s.sale_timestamp) AS sale_year,
EXTRACT(MONTH FROM s.sale_timestamp) AS sale_month,
EXTRACT(DAY FROM s.sale_timestamp) AS sale_day,
EXTRACT(HOUR FROM s.sale_timestamp) AS sale_hour,
CASE EXTRACT(DOW FROM s.sale_timestamp)
WHEN 0 THEN 'Sunday'
WHEN 1 THEN 'Monday'
WHEN 2 THEN 'Tuesday'
WHEN 3 THEN 'Wednesday'
WHEN 4 THEN 'Thursday'
WHEN 5 THEN 'Friday'
WHEN 6 THEN 'Saturday'
END AS day_of_week,
-- Metadata
CURRENT_TIMESTAMP AS _dbt_updated_at
FROM sales_raw s
LEFT JOIN customers c ON s.customer_id = c.customer_id
LEFT JOIN products p ON s.product_id = p.product_id
{{ dbt_utils.group_by(n=20) }}# models/silver/schema.yml
version: 2
models:
- name: sales_enriched
description: Enriched sales transactions with customer and product dimensions
columns:
- name: sale_id
description: Unique sale identifier
tests:
- unique
- not_null
- name: customer_id
description: Customer identifier
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: product_id
description: Product identifier
tests:
- not_null
- relationships:
to: ref('dim_products')
field: product_id
- name: amount
description: Sale amount in USD
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000
- name: profit
description: Sale profit (amount - cost)
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: -100000
max_value: 900000
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 1000
severity: warn# streaming/kafka_consumer.py
from kafka import KafkaConsumer, KafkaProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
import json
import logging
from typing import Dict, Any
import psycopg2
from psycopg2.extras import execute_batch
class SalesEventProcessor:
def __init__(self):
self.schema_registry = SchemaRegistryClient({
'url': 'http://schema-registry:8081'
})
self.consumer = KafkaConsumer(
'sales-events',
bootstrap_servers=['kafka:9092'],
group_id='sales-processor',
enable_auto_commit=False,
auto_offset_reset='earliest',
value_deserializer=self._deserialize_avro,
max_poll_records=500,
session_timeout_ms=30000,
)
self.producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=self._serialize_avro,
acks='all',
retries=3,
max_in_flight_requests_per_connection=1,
)
self.db_conn = psycopg2.connect(
host='warehouse',
database='analytics',
user='etl_user',
password=get_secret('DB_PASSWORD')
)
self.batch = []
self.batch_size = 100
def _deserialize_avro(self, msg_value: bytes) -> Dict:
"""Deserialize Avro message"""
avro_deserializer = AvroDeserializer(
self.schema_registry,
schema_str=self._get_schema('sales-event-value')
)
return avro_deserializer(msg_value, None)
def _serialize_avro(self, data: Dict) -> bytes:
"""Serialize to Avro"""
avro_serializer = AvroSerializer(
self.schema_registry,
schema_str=self._get_schema('enriched-sales-value')
)
return avro_serializer(data, None)
def process_events(self):
"""Process incoming sales events"""
try:
for message in self.consumer:
try:
event = message.value
# Enrich event
enriched = self.enrich_event(event)
# Validate
if not self.validate_event(enriched):
logging.warning(f"Invalid event: {event}")
continue
# Add to batch
self.batch.append(enriched)
# Process batch when full
if len(self.batch) >= self.batch_size:
self.flush_batch()
# Commit offset after successful processing
self.consumer.commit()
except Exception as e:
logging.error(f"Error processing message: {e}")
# Send to dead letter queue
self.producer.send('sales-events-dlq', value=message.value)
except KeyboardInterrupt:
logging.info("Shutting down processor...")
finally:
self.flush_batch()
self.consumer.close()
self.producer.close()
self.db_conn.close()
def enrich_event(self, event: Dict) -> Dict:
"""Enrich event with additional data"""
cursor = self.db_conn.cursor()
# Fetch customer data
cursor.execute(
"SELECT customer_segment, customer_lifetime_value FROM dim_customers WHERE customer_id = %s",
(event['customer_id'],)
)
customer_data = cursor.fetchone()
# Fetch product data
cursor.execute(
"SELECT product_category, product_price FROM dim_products WHERE product_id = %s",
(event['product_id'],)
)
product_data = cursor.fetchone()
cursor.close()
return {
**event,
'customer_segment': customer_data[0] if customer_data else None,
'customer_lifetime_value': customer_data[1] if customer_data else 0,
'product_category': product_data[0] if product_data else None,
'product_price': product_data[1] if product_data else 0,
'enriched_at': datetime.utcnow().isoformat()
}
def validate_event(self, event: Dict) -> bool:
"""Validate event data"""
required_fields = ['sale_id', 'customer_id', 'product_id', 'amount']
if not all(field in event for field in required_fields):
return False
if event['amount'] <= 0 or event['amount'] > 1000000:
return False
return True
def flush_batch(self):
"""Flush batch to database and downstream topic"""
if not self.batch:
return
cursor = self.db_conn.cursor()
try:
# Batch insert to warehouse
execute_batch(
cursor,
"""
INSERT INTO streaming.sales_events (
sale_id, customer_id, product_id, amount,
customer_segment, product_category, enriched_at
)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (sale_id) DO UPDATE
SET enriched_at = EXCLUDED.enriched_at
""",
[(e['sale_id'], e['customer_id'], e['product_id'], e['amount'],
e['customer_segment'], e['product_category'], e['enriched_at'])
for e in self.batch]
)
self.db_conn.commit()
# Publish enriched events
for event in self.batch:
self.producer.send('enriched-sales-events', value=event)
self.producer.flush()
logging.info(f"Flushed batch of {len(self.batch)} events")
self.batch = []
except Exception as e:
logging.error(f"Error flushing batch: {e}")
self.db_conn.rollback()
finally:
cursor.close()# quality/great_expectations_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.checkpoint import Checkpoint
def create_sales_quality_suite() -> ExpectationSuite:
"""Create comprehensive quality suite for sales data"""
context = gx.get_context()
suite = context.add_expectation_suite("sales_quality_suite")
# Table-level expectations
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000,
max_value=10000000
)
)
# Column existence
required_columns = ['sale_id', 'customer_id', 'product_id', 'amount', 'sale_timestamp']
for col in required_columns:
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column=col)
)
# Uniqueness
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column='sale_id')
)
# Null checks
for col in required_columns:
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column=col)
)
# Value ranges
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column='amount',
min_value=0,
max_value=1000000,
mostly=0.99 # Allow 1% outliers
)
)
# Data types
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeOfType(
column='amount',
type_='float64'
)
)
# Regex patterns
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column='email',
regex=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
mostly=0.95
)
)
# Referential integrity
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column='customer_id',
value_set=get_valid_customer_ids() # From dimension table
)
)
# Custom expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column='profit_margin',
min_value=-1.0,
max_value=1.0
)
)
return suite
def run_quality_checkpoint(data_source: str, suite_name: str) -> Dict:
"""Run quality checkpoint"""
context = gx.get_context()
checkpoint = Checkpoint(
name="sales_checkpoint",
data_context=context,
expectation_suite_name=suite_name,
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": get_secret('SLACK_WEBHOOK'),
},
},
],
)
results = checkpoint.run()
return {
'success': results['success'],
'statistics': results.statistics,
'results': results.run_results
}# cdc/debezium_processor.py
from kafka import KafkaConsumer
import json
from typing import Dict, Any
import psycopg2
from datetime import datetime
class DebeziumCDCProcessor:
def __init__(self):
self.consumer = KafkaConsumer(
'dbserver1.public.sales', # Debezium topic
bootstrap_servers=['kafka:9092'],
group_id='cdc-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
)
self.warehouse_conn = psycopg2.connect(
host='warehouse',
database='analytics',
user='cdc_user',
password=get_secret('DB_PASSWORD')
)
def process_changes(self):
"""Process CDC events from Debezium"""
for message in self.consumer:
payload = message.value
if payload is None:
continue
operation = payload.get('op') # c=create, u=update, d=delete
if operation == 'c':
self.handle_insert(payload['after'])
elif operation == 'u':
self.handle_update(payload['before'], payload['after'])
elif operation == 'd':
self.handle_delete(payload['before'])
def handle_insert(self, record: Dict):
"""Handle INSERT operation"""
cursor = self.warehouse_conn.cursor()
cursor.execute(
"""
INSERT INTO bronze.sales_cdc (sale_id, customer_id, amount, cdc_operation, cdc_timestamp)
VALUES (%s, %s, %s, 'INSERT', %s)
""",
(record['sale_id'], record['customer_id'], record['amount'], datetime.utcnow())
)
self.warehouse_conn.commit()
cursor.close(){
"maxTokens": 4000,
"temperature": 0.3,
"systemPrompt": "You are a data pipeline engineering agent focused on scalable, reliable data infrastructure"
}Airflow DAG tasks failing with retry exhausted after transient errors
Configure exponential backoff with retry_exponential_backoff parameter. Set max_retry_delay to prevent excessive wait times. Implement on_failure_callback for custom retry logic. Use rescheduling instead of retrying for long-running tasks.
Apache Kafka consumer lag growing causing data processing delays
Increase consumer group parallelism by adding more consumer instances. Optimize batch processing with max_poll_records tuning. Enable consumer auto-commit with reduced interval. Monitor offset lag with Prometheus kafka_consumergroup_lag metric.
dbt incremental models not detecting new records in source tables
Verify incremental_strategy merge configuration in model config. Check unique_key matches source table primary key. Run dbt run --full-refresh to reset incremental state. Ensure is_incremental macro conditions are correct.
Great Expectations validation suite failing on legitimate data variations
Adjust expectation thresholds with mostly parameter for partial compliance. Use row_condition to filter validation scope. Implement custom expectations for domain-specific rules. Review validation results in Data Docs and refine expectations.
S3 data lake query performance slow despite partitioning strategy
Verify partition pruning works with EXPLAIN query plan. Convert to columnar format like Parquet for better compression. Create Glue catalog partitions with MSCK REPAIR TABLE. Use file compaction to reduce small file overhead.
Loading reviews...
Expert code reviewer that provides thorough, constructive feedback on code quality, security, performance, and best practices
Specialized in creating beautiful, intuitive user interfaces and exceptional user experiences
Expert backend architect specializing in scalable system design, microservices, API development, and infrastructure planning
Growing community of AI engineers actively building with Claude
Live in 5 minutes • Growing community