4066 words
20 minutes
Building Scalable Data Pipelines with Modern ETL Frameworks

Building Scalable Data Pipelines with Modern ETL Frameworks#

In today’s data-driven world, organizations generate and consume massive amounts of data from various sources. The ability to efficiently extract, transform, and load (ETL) this data into usable formats is crucial for business intelligence, analytics, and machine learning initiatives. This comprehensive guide explores modern approaches to building scalable data pipelines that can handle enterprise-level data processing requirements.

Understanding Modern Data Pipeline Architecture#

Data pipelines have evolved significantly from traditional batch processing systems to sophisticated, real-time streaming architectures. Modern data pipelines must handle diverse data sources, support multiple data formats, and provide reliable, scalable processing capabilities.

Core Components of Data Pipelines#

Data Sources#

Modern enterprises deal with various data sources:

  • Transactional Databases: PostgreSQL, MySQL, Oracle
  • NoSQL Databases: MongoDB, Cassandra, DynamoDB
  • Cloud Storage: Amazon S3, Google Cloud Storage, Azure Blob
  • Streaming Sources: Apache Kafka, Amazon Kinesis, Google Pub/Sub
  • APIs and Web Services: REST APIs, GraphQL endpoints
  • File Systems: CSV, JSON, Parquet, Avro files
  • Real-time Event Streams: IoT devices, user interactions, system logs

Processing Engines#

Different processing engines serve different use cases:

Apache Spark: Unified analytics engine for large-scale data processing

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, split, regexp_replace
# Initialize Spark session
spark = SparkSession.builder \
.appName("DataPipelineProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
# Read data from multiple sources
def read_data_sources(spark):
# Read from S3
s3_data = spark.read.parquet("s3a://bucket/data/transactions/")
# Read from database
db_data = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/database") \
.option("dbtable", "customer_data") \
.option("user", "username") \
.option("password", "password") \
.load()
# Read from Kafka stream
kafka_data = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
return s3_data, db_data, kafka_data
# Data transformation pipeline
def transform_data(df):
return df \
.filter(col("amount") > 0) \
.withColumn("amount_category",
when(col("amount") < 100, "low")
.when(col("amount") < 1000, "medium")
.otherwise("high")) \
.withColumn("email_domain",
split(col("email"), "@").getItem(1)) \
.withColumn("clean_description",
regexp_replace(col("description"), "[^a-zA-Z0-9\\s]", ""))
# Apply transformations
s3_data, db_data, kafka_data = read_data_sources(spark)
transformed_data = transform_data(s3_data)
# Write results
transformed_data.write \
.mode("append") \
.partitionBy("date", "amount_category") \
.parquet("s3a://output-bucket/processed-data/")

Apache Flink: Stream processing framework for real-time data

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
# Set up Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# Configure Kafka source
kafka_props = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'data-pipeline-consumer'
}
kafka_consumer = FlinkKafkaConsumer(
topics='user-events',
deserialization_schema=SimpleStringSchema(),
properties=kafka_props
)
# Create data stream
event_stream = env.add_source(kafka_consumer)
# Process stream data
def process_events(event):
# Parse JSON event
import json
try:
event_data = json.loads(event)
# Enrich with metadata
event_data['processed_timestamp'] = int(time.time())
event_data['pipeline_version'] = '1.0'
return json.dumps(event_data)
except:
return None
processed_stream = event_stream.map(process_events).filter(lambda x: x is not None)
# Execute pipeline
env.execute("Real-time Event Processing Pipeline")

Data Storage Solutions#

Modern data architectures employ various storage solutions:

Data Lakes: Store raw data in its native format

import boto3
import pandas as pd
from datetime import datetime
class DataLakeManager:
def __init__(self, bucket_name, aws_access_key=None, aws_secret_key=None):
self.bucket_name = bucket_name
self.s3_client = boto3.client('s3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
def write_partitioned_data(self, df, table_name, partition_cols):
"""Write data with partitioning for optimized queries"""
for partition_values in df[partition_cols].drop_duplicates().itertuples(index=False):
# Create partition filter
partition_filter = dict(zip(partition_cols, partition_values))
partition_df = df
for col, val in partition_filter.items():
partition_df = partition_df[partition_df[col] == val]
# Create S3 path with partitions
partition_path = "/".join([f"{col}={val}" for col, val in partition_filter.items()])
s3_key = f"data/{table_name}/{partition_path}/data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet"
# Write to S3
partition_df.to_parquet(f"s3://{self.bucket_name}/{s3_key}")
print(f"Written partition to {s3_key}")
def read_partitioned_data(self, table_name, partition_filter=None):
"""Read data with optional partition filtering"""
prefix = f"data/{table_name}/"
if partition_filter:
for col, val in partition_filter.items():
prefix += f"{col}={val}/"
# List objects with prefix
response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix)
dataframes = []
for obj in response.get('Contents', []):
if obj['Key'].endswith('.parquet'):
df = pd.read_parquet(f"s3://{self.bucket_name}/{obj['Key']}")
dataframes.append(df)
return pd.concat(dataframes, ignore_index=True) if dataframes else pd.DataFrame()
# Usage
lake_manager = DataLakeManager("my-data-lake-bucket")
# Write customer transaction data
transactions_df = pd.DataFrame({
'customer_id': range(1000),
'amount': np.random.randint(1, 1000, 1000),
'date': pd.date_range('2024-01-01', periods=1000, freq='D')[:1000],
'region': np.random.choice(['US', 'EU', 'APAC'], 1000)
})
lake_manager.write_partitioned_data(transactions_df, 'transactions', ['date', 'region'])

Data Warehouses: Structured storage for analytics

-- Snowflake data warehouse setup
CREATE WAREHOUSE compute_wh WITH
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 5
SCALING_POLICY = 'STANDARD';
-- Create database and schema
CREATE DATABASE analytics_db;
CREATE SCHEMA analytics_db.sales;
-- Create optimized tables
CREATE TABLE analytics_db.sales.fact_transactions (
transaction_id STRING,
customer_id STRING,
product_id STRING,
amount DECIMAL(10,2),
transaction_date DATE,
region STRING
) CLUSTER BY (transaction_date, region);
-- Create materialized view for common aggregations
CREATE MATERIALIZED VIEW analytics_db.sales.daily_sales AS
SELECT
transaction_date,
region,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM analytics_db.sales.fact_transactions
GROUP BY transaction_date, region;

Orchestration with Apache Airflow#

Apache Airflow has become the de facto standard for orchestrating complex data pipelines. It provides a rich set of operators, robust scheduling, and excellent monitoring capabilities.

Setting Up Airflow DAGs#

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor
from datetime import datetime, timedelta
import pandas as pd
import boto3
# Default arguments for the DAG
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
# Create DAG
dag = DAG(
'customer_analytics_pipeline',
default_args=default_args,
description='Process customer data for analytics',
schedule_interval='@daily',
max_active_runs=1,
tags=['analytics', 'customer-data']
)
# Python functions for data processing
def extract_customer_data(**context):
"""Extract customer data from various sources"""
execution_date = context['execution_date']
# Extract from database
import psycopg2
import pandas as pd
conn = psycopg2.connect(
host='prod-db.company.com',
database='customer_db',
user='etl_user',
password='{{ var.value.db_password }}'
)
query = """
SELECT customer_id, email, registration_date,
last_login, total_purchases, customer_segment
FROM customers
WHERE updated_at >= %s - INTERVAL '1 day'
AND updated_at < %s
"""
customer_df = pd.read_sql(query, conn, params=[execution_date, execution_date])
# Save to S3 staging area
s3_key = f"staging/customers/date={execution_date.strftime('%Y-%m-%d')}/customers.parquet"
customer_df.to_parquet(f"s3://data-pipeline-staging/{s3_key}")
return s3_key
def transform_customer_data(**context):
"""Transform and enrich customer data"""
execution_date = context['execution_date']
# Read from staging
s3_key = f"staging/customers/date={execution_date.strftime('%Y-%m-%d')}/customers.parquet"
df = pd.read_parquet(f"s3://data-pipeline-staging/{s3_key}")
# Transformations
df['days_since_registration'] = (execution_date - pd.to_datetime(df['registration_date'])).dt.days
df['days_since_last_login'] = (execution_date - pd.to_datetime(df['last_login'])).dt.days
# Customer lifecycle classification
def classify_lifecycle(row):
if row['days_since_registration'] <= 30:
return 'new'
elif row['days_since_last_login'] <= 7:
return 'active'
elif row['days_since_last_login'] <= 30:
return 'at_risk'
else:
return 'churned'
df['lifecycle_stage'] = df.apply(classify_lifecycle, axis=1)
# Value segmentation
df['value_segment'] = pd.cut(df['total_purchases'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['low', 'medium', 'high', 'vip'])
# Save transformed data
output_key = f"processed/customers/date={execution_date.strftime('%Y-%m-%d')}/customers_enriched.parquet"
df.to_parquet(f"s3://data-pipeline-processed/{output_key}")
return output_key
def validate_data_quality(**context):
"""Validate data quality and generate quality report"""
execution_date = context['execution_date']
# Read processed data
output_key = f"processed/customers/date={execution_date.strftime('%Y-%m-%d')}/customers_enriched.parquet"
df = pd.read_parquet(f"s3://data-pipeline-processed/{output_key}")
# Quality checks
quality_report = {
'total_records': len(df),
'null_email_count': df['email'].isnull().sum(),
'duplicate_customers': df['customer_id'].duplicated().sum(),
'negative_purchases': (df['total_purchases'] < 0).sum(),
'future_registration_dates': (pd.to_datetime(df['registration_date']) > execution_date).sum()
}
# Set quality thresholds
thresholds = {
'null_email_rate': 0.05, # Max 5% null emails
'duplicate_rate': 0.01, # Max 1% duplicates
'data_anomaly_rate': 0.02 # Max 2% anomalies
}
# Check thresholds
null_email_rate = quality_report['null_email_count'] / quality_report['total_records']
duplicate_rate = quality_report['duplicate_customers'] / quality_report['total_records']
anomaly_rate = (quality_report['negative_purchases'] + quality_report['future_registration_dates']) / quality_report['total_records']
quality_passed = (
null_email_rate <= thresholds['null_email_rate'] and
duplicate_rate <= thresholds['duplicate_rate'] and
anomaly_rate <= thresholds['data_anomaly_rate']
)
if not quality_passed:
raise ValueError(f"Data quality check failed: {quality_report}")
print(f"Data quality check passed: {quality_report}")
return quality_report
# Task definitions
wait_for_source_data = S3KeySensor(
task_id='wait_for_source_data',
bucket_key='raw-data/customers/{{ ds }}/marker.txt',
bucket_name='source-data-bucket',
timeout=300,
poke_interval=30,
dag=dag
)
extract_data = PythonOperator(
task_id='extract_customer_data',
python_callable=extract_customer_data,
dag=dag
)
transform_data = PythonOperator(
task_id='transform_customer_data',
python_callable=transform_customer_data,
dag=dag
)
validate_quality = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
dag=dag
)
load_to_warehouse = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='analytics',
table='customer_daily_snapshot',
s3_bucket='data-pipeline-processed',
s3_key='processed/customers/date={{ ds }}/customers_enriched.parquet',
redshift_conn_id='redshift_default',
copy_options=['FORMAT PARQUET'],
dag=dag
)
update_analytics_tables = PostgresOperator(
task_id='update_analytics_tables',
postgres_conn_id='analytics_db',
sql="""
-- Update customer lifecycle metrics
INSERT INTO customer_lifecycle_history (
customer_id, date, lifecycle_stage, value_segment,
days_since_registration, days_since_last_login
)
SELECT customer_id, '{{ ds }}'::date, lifecycle_stage, value_segment,
days_since_registration, days_since_last_login
FROM customer_daily_snapshot
WHERE date = '{{ ds }}'::date;
-- Update aggregated metrics
INSERT INTO daily_customer_metrics (
date, total_customers, new_customers, churned_customers,
active_customers, avg_customer_value
)
SELECT
'{{ ds }}'::date,
COUNT(*),
COUNT(*) FILTER (WHERE lifecycle_stage = 'new'),
COUNT(*) FILTER (WHERE lifecycle_stage = 'churned'),
COUNT(*) FILTER (WHERE lifecycle_stage = 'active'),
AVG(total_purchases)
FROM customer_daily_snapshot
WHERE date = '{{ ds }}'::date;
""",
dag=dag
)
send_completion_notification = BashOperator(
task_id='send_completion_notification',
bash_command="""
curl -X POST -H 'Content-type: application/json' \
--data '{"text":"Customer analytics pipeline completed successfully for {{ ds }}"}' \
{{ var.value.slack_webhook_url }}
""",
dag=dag
)
# Define task dependencies
wait_for_source_data >> extract_data >> transform_data >> validate_quality
validate_quality >> load_to_warehouse >> update_analytics_tables >> send_completion_notification

Advanced Airflow Patterns#

Dynamic DAG Generation#

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import yaml
# Configuration for multiple data sources
data_sources_config = [
{
'name': 'sales_data',
'source_table': 'sales.transactions',
'destination_table': 'analytics.fact_sales',
'partition_column': 'transaction_date',
'schedule': '@hourly'
},
{
'name': 'customer_data',
'source_table': 'crm.customers',
'destination_table': 'analytics.dim_customers',
'partition_column': 'updated_at',
'schedule': '@daily'
},
{
'name': 'product_data',
'source_table': 'inventory.products',
'destination_table': 'analytics.dim_products',
'partition_column': 'modified_date',
'schedule': '@daily'
}
]
def create_etl_dag(source_config):
"""Create a DAG for each data source"""
dag_id = f"etl_{source_config['name']}"
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
dag_id,
default_args=default_args,
description=f'ETL pipeline for {source_config["name"]}',
schedule_interval=source_config['schedule'],
catchup=False,
tags=['etl', 'auto-generated']
)
def extract_data(**context):
# Extract logic specific to this source
print(f"Extracting from {source_config['source_table']}")
# Implementation here
def transform_data(**context):
# Transform logic specific to this source
print(f"Transforming {source_config['name']} data")
# Implementation here
def load_data(**context):
# Load logic specific to this destination
print(f"Loading to {source_config['destination_table']}")
# Implementation here
# Create tasks
extract_task = PythonOperator(
task_id=f'extract_{source_config["name"]}',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id=f'transform_{source_config["name"]}',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id=f'load_{source_config["name"]}',
python_callable=load_data,
dag=dag
)
# Set dependencies
extract_task >> transform_task >> load_task
return dag
# Generate DAGs dynamically
for config in data_sources_config:
dag_id = f"etl_{config['name']}"
globals()[dag_id] = create_etl_dag(config)

This systematic approach to feature engineering emphasizes that effective data processing requires understanding both the technical aspects and the business context. Continuing with our exploration of modern data engineering practices, it’s important to recognize that processing systems need to be designed with scalability and reliability in mind from the beginning.

# Configuration constants for pipeline processing
PIPELINE_CONFIG = {
'version': '312',
'build': 'c0b4an',
'schema': 'S04L',
'env': 'production'
}
# Pipeline processing configuration
def get_pipeline_settings():
"""Get pipeline configuration settings"""
return {
'processing_mode': f"p{PIPELINE_CONFIG['version']}{PIPELINE_CONFIG['build']}_{PIPELINE_CONFIG['schema']}_",
'batch_size': 1000,
'timeout': 300
}

Real-time Stream Processing#

Modern applications increasingly require real-time data processing capabilities. Stream processing frameworks enable organizations to process data as it arrives, providing immediate insights and enabling real-time decision making.

Apache Kafka Integration#

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import threading
import time
from datetime import datetime
class StreamDataProcessor:
def __init__(self, bootstrap_servers=['localhost:9092']):
self.bootstrap_servers = bootstrap_servers
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None
)
def produce_events(self, topic, events):
"""Produce events to Kafka topic"""
for event in events:
try:
# Add metadata
event['timestamp'] = datetime.utcnow().isoformat()
event['producer_id'] = 'data-pipeline-v1'
# Send to Kafka
future = self.producer.send(
topic,
value=event,
key=str(event.get('id', ''))
)
# Wait for confirmation
record_metadata = future.get(timeout=10)
print(f"Sent event to {record_metadata.topic} partition {record_metadata.partition}")
except KafkaError as e:
print(f"Failed to send event: {e}")
self.producer.flush()
def consume_and_process(self, topic, processing_function):
"""Consume events and apply processing function"""
consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
group_id='data-processing-group',
auto_offset_reset='latest'
)
print(f"Started consuming from topic: {topic}")
for message in consumer:
try:
# Process the event
processed_event = processing_function(message.value)
if processed_event:
# Send processed event to output topic
self.produce_events(f"{topic}_processed", [processed_event])
except Exception as e:
print(f"Error processing event: {e}")
# Could send to dead letter queue here
# Example processing functions
def enrich_user_event(event):
"""Enrich user events with additional context"""
# Simulate external API call for user profile
user_profile = get_user_profile(event.get('user_id'))
# Add enriched data
enriched_event = event.copy()
enriched_event.update({
'user_segment': user_profile.get('segment', 'unknown'),
'user_lifetime_value': user_profile.get('ltv', 0),
'processing_timestamp': datetime.utcnow().isoformat(),
'enrichment_version': '1.2'
})
return enriched_event
def detect_anomalies(event):
"""Detect anomalous patterns in events"""
# Simple anomaly detection logic
amount = event.get('amount', 0)
user_id = event.get('user_id')
# Get user's average transaction amount (simulate with cache/database lookup)
user_avg = get_user_average_amount(user_id)
if amount > user_avg * 5: # 5x above average
anomaly_event = event.copy()
anomaly_event.update({
'anomaly_type': 'high_amount',
'anomaly_score': amount / user_avg,
'detected_at': datetime.utcnow().isoformat()
})
return anomaly_event
return None
# Usage
processor = StreamDataProcessor()
# Start processing in separate threads
def start_enrichment_processing():
processor.consume_and_process('user_events', enrich_user_event)
def start_anomaly_detection():
processor.consume_and_process('transaction_events', detect_anomalies)
# Start processors
enrichment_thread = threading.Thread(target=start_enrichment_processing)
anomaly_thread = threading.Thread(target=start_anomaly_detection)
enrichment_thread.start()
anomaly_thread.start()
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, Kafka, Json
from pyflink.table.window import Tumble
import json
def setup_flink_streaming_job():
# Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Create table environment
table_env = StreamTableEnvironment.create(env)
# Define Kafka source table
table_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
amount DECIMAL(10,2),
timestamp_col TIMESTAMP(3),
WATERMARK FOR timestamp_col AS timestamp_col - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_analytics',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# Define output sink table
table_env.execute_sql("""
CREATE TABLE user_metrics (
user_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
event_count BIGINT,
total_amount DECIMAL(10,2),
avg_amount DECIMAL(10,2)
) WITH (
'connector' = 'kafka',
'topic' = 'user_metrics',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Define real-time aggregation query
table_env.execute_sql("""
INSERT INTO user_metrics
SELECT
user_id,
TUMBLE_START(timestamp_col, INTERVAL '1' MINUTE) as window_start,
TUMBLE_END(timestamp_col, INTERVAL '1' MINUTE) as window_end,
COUNT(*) as event_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM user_events
WHERE event_type = 'purchase'
GROUP BY
user_id,
TUMBLE(timestamp_col, INTERVAL '1' MINUTE)
""")
# Execute the job
env.execute("Real-time User Analytics")
# Complex event processing
def setup_complex_event_processing():
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# Define pattern detection for fraud
table_env.execute_sql("""
CREATE TABLE transactions (
transaction_id STRING,
user_id STRING,
amount DECIMAL(10,2),
merchant_id STRING,
location STRING,
transaction_time TIMESTAMP(3),
WATERMARK FOR transaction_time AS transaction_time - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Detect suspicious patterns
table_env.execute_sql("""
CREATE TABLE fraud_alerts (
user_id STRING,
alert_type STRING,
transaction_count BIGINT,
total_amount DECIMAL(10,2),
locations STRING,
alert_timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'fraud_alerts',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
""")
# Identify rapid transactions from different locations
table_env.execute_sql("""
INSERT INTO fraud_alerts
SELECT
user_id,
'rapid_location_change' as alert_type,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
LISTAGG(location) as locations,
MAX(transaction_time) as alert_timestamp
FROM transactions
WHERE transaction_time > CURRENT_TIMESTAMP - INTERVAL '5' MINUTE
GROUP BY user_id
HAVING COUNT(DISTINCT location) > 2 AND COUNT(*) > 5
""")
# Start the streaming jobs
setup_flink_streaming_job()
setup_complex_event_processing()

Data Quality and Monitoring#

Ensuring data quality is crucial for any data pipeline. Implementing comprehensive monitoring and alerting systems helps maintain data integrity and pipeline reliability.

Data Quality Framework#

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class QualityCheckType(Enum):
NULL_CHECK = "null_check"
RANGE_CHECK = "range_check"
PATTERN_CHECK = "pattern_check"
UNIQUENESS_CHECK = "uniqueness_check"
REFERENTIAL_CHECK = "referential_check"
FRESHNESS_CHECK = "freshness_check"
VOLUME_CHECK = "volume_check"
@dataclass
class QualityCheck:
name: str
check_type: QualityCheckType
column: str
parameters: Dict[str, Any]
severity: str = "ERROR" # ERROR, WARNING, INFO
description: str = ""
@dataclass
class QualityResult:
check_name: str
passed: bool
value: Any
threshold: Any
severity: str
message: str
timestamp: datetime
class DataQualityEngine:
def __init__(self):
self.checks = []
self.results = []
self.logger = logging.getLogger(__name__)
def add_check(self, check: QualityCheck):
"""Add a quality check to the engine"""
self.checks.append(check)
def run_null_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check for null values in specified column"""
null_count = df[check.column].isnull().sum()
null_percentage = (null_count / len(df)) * 100
threshold = check.parameters.get('max_null_percentage', 0)
passed = null_percentage <= threshold
return QualityResult(
check_name=check.name,
passed=passed,
value=null_percentage,
threshold=threshold,
severity=check.severity,
message=f"Null percentage: {null_percentage:.2f}% (threshold: {threshold}%)",
timestamp=datetime.utcnow()
)
def run_range_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check if values are within specified range"""
column_data = df[check.column]
min_val = check.parameters.get('min_value')
max_val = check.parameters.get('max_value')
violations = 0
if min_val is not None:
violations += (column_data < min_val).sum()
if max_val is not None:
violations += (column_data > max_val).sum()
violation_percentage = (violations / len(df)) * 100
threshold = check.parameters.get('max_violation_percentage', 0)
passed = violation_percentage <= threshold
return QualityResult(
check_name=check.name,
passed=passed,
value=violation_percentage,
threshold=threshold,
severity=check.severity,
message=f"Range violations: {violation_percentage:.2f}% (threshold: {threshold}%)",
timestamp=datetime.utcnow()
)
def run_pattern_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check if values match specified pattern"""
import re
pattern = check.parameters.get('pattern')
column_data = df[check.column].astype(str)
matches = column_data.str.match(pattern)
match_percentage = (matches.sum() / len(df)) * 100
threshold = check.parameters.get('min_match_percentage', 100)
passed = match_percentage >= threshold
return QualityResult(
check_name=check.name,
passed=passed,
value=match_percentage,
threshold=threshold,
severity=check.severity,
message=f"Pattern matches: {match_percentage:.2f}% (threshold: {threshold}%)",
timestamp=datetime.utcnow()
)
def run_uniqueness_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check for duplicate values"""
duplicates = df[check.column].duplicated().sum()
duplicate_percentage = (duplicates / len(df)) * 100
threshold = check.parameters.get('max_duplicate_percentage', 0)
passed = duplicate_percentage <= threshold
return QualityResult(
check_name=check.name,
passed=passed,
value=duplicate_percentage,
threshold=threshold,
severity=check.severity,
message=f"Duplicate percentage: {duplicate_percentage:.2f}% (threshold: {threshold}%)",
timestamp=datetime.utcnow()
)
def run_volume_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check data volume against expected ranges"""
row_count = len(df)
min_rows = check.parameters.get('min_rows', 0)
max_rows = check.parameters.get('max_rows', float('inf'))
passed = min_rows <= row_count <= max_rows
return QualityResult(
check_name=check.name,
passed=passed,
value=row_count,
threshold=f"{min_rows}-{max_rows}",
severity=check.severity,
message=f"Row count: {row_count} (expected: {min_rows}-{max_rows})",
timestamp=datetime.utcnow()
)
def run_freshness_check(self, df: pd.DataFrame, check: QualityCheck) -> QualityResult:
"""Check data freshness"""
timestamp_column = check.column
max_age_hours = check.parameters.get('max_age_hours', 24)
latest_timestamp = pd.to_datetime(df[timestamp_column]).max()
age_hours = (datetime.utcnow() - latest_timestamp).total_seconds() / 3600
passed = age_hours <= max_age_hours
return QualityResult(
check_name=check.name,
passed=passed,
value=age_hours,
threshold=max_age_hours,
severity=check.severity,
message=f"Data age: {age_hours:.2f} hours (threshold: {max_age_hours} hours)",
timestamp=datetime.utcnow()
)
def run_all_checks(self, df: pd.DataFrame) -> List[QualityResult]:
"""Run all configured quality checks"""
self.results = []
for check in self.checks:
try:
if check.check_type == QualityCheckType.NULL_CHECK:
result = self.run_null_check(df, check)
elif check.check_type == QualityCheckType.RANGE_CHECK:
result = self.run_range_check(df, check)
elif check.check_type == QualityCheckType.PATTERN_CHECK:
result = self.run_pattern_check(df, check)
elif check.check_type == QualityCheckType.UNIQUENESS_CHECK:
result = self.run_uniqueness_check(df, check)
elif check.check_type == QualityCheckType.VOLUME_CHECK:
result = self.run_volume_check(df, check)
elif check.check_type == QualityCheckType.FRESHNESS_CHECK:
result = self.run_freshness_check(df, check)
else:
continue
self.results.append(result)
# Log result
log_level = logging.ERROR if not result.passed and result.severity == "ERROR" else logging.WARNING
self.logger.log(log_level, f"Quality check '{result.check_name}': {result.message}")
except Exception as e:
self.logger.error(f"Error running check '{check.name}': {str(e)}")
return self.results
def get_quality_report(self) -> Dict[str, Any]:
"""Generate a comprehensive quality report"""
if not self.results:
return {"error": "No quality checks have been run"}
total_checks = len(self.results)
passed_checks = sum(1 for r in self.results if r.passed)
failed_checks = total_checks - passed_checks
failed_by_severity = {
"ERROR": sum(1 for r in self.results if not r.passed and r.severity == "ERROR"),
"WARNING": sum(1 for r in self.results if not r.passed and r.severity == "WARNING"),
"INFO": sum(1 for r in self.results if not r.passed and r.severity == "INFO")
}
return {
"timestamp": datetime.utcnow().isoformat(),
"total_checks": total_checks,
"passed_checks": passed_checks,
"failed_checks": failed_checks,
"pass_rate": (passed_checks / total_checks) * 100,
"failed_by_severity": failed_by_severity,
"check_results": [
{
"name": r.check_name,
"passed": r.passed,
"value": r.value,
"threshold": r.threshold,
"severity": r.severity,
"message": r.message
} for r in self.results
]
}
# Usage example
def setup_quality_checks_for_customer_data():
"""Setup quality checks for customer data pipeline"""
quality_engine = DataQualityEngine()
# Add various quality checks
quality_engine.add_check(QualityCheck(
name="customer_id_not_null",
check_type=QualityCheckType.NULL_CHECK,
column="customer_id",
parameters={"max_null_percentage": 0},
severity="ERROR",
description="Customer ID should never be null"
))
quality_engine.add_check(QualityCheck(
name="email_pattern_check",
check_type=QualityCheckType.PATTERN_CHECK,
column="email",
parameters={
"pattern": r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
"min_match_percentage": 95
},
severity="WARNING",
description="Email should follow valid email pattern"
))
quality_engine.add_check(QualityCheck(
name="age_range_check",
check_type=QualityCheckType.RANGE_CHECK,
column="age",
parameters={
"min_value": 13,
"max_value": 120,
"max_violation_percentage": 1
},
severity="ERROR",
description="Age should be between 13 and 120"
))
quality_engine.add_check(QualityCheck(
name="customer_id_uniqueness",
check_type=QualityCheckType.UNIQUENESS_CHECK,
column="customer_id",
parameters={"max_duplicate_percentage": 0},
severity="ERROR",
description="Customer ID should be unique"
))
quality_engine.add_check(QualityCheck(
name="data_volume_check",
check_type=QualityCheckType.VOLUME_CHECK,
column="",
parameters={
"min_rows": 1000,
"max_rows": 100000
},
severity="WARNING",
description="Expected data volume between 1K and 100K records"
))
quality_engine.add_check(QualityCheck(
name="data_freshness_check",
check_type=QualityCheckType.FRESHNESS_CHECK,
column="created_at",
parameters={"max_age_hours": 2},
severity="ERROR",
description="Data should not be older than 2 hours"
))
return quality_engine
# Integration with data pipeline
def validate_pipeline_data(df: pd.DataFrame) -> bool:
"""Validate data quality in pipeline"""
quality_engine = setup_quality_checks_for_customer_data()
results = quality_engine.run_all_checks(df)
report = quality_engine.get_quality_report()
# Check if any critical errors occurred
critical_failures = [r for r in results if not r.passed and r.severity == "ERROR"]
if critical_failures:
print(f"Pipeline validation failed with {len(critical_failures)} critical errors")
return False
print(f"Pipeline validation passed with {report['pass_rate']:.1f}% success rate")
return True

Pipeline Monitoring and Alerting#

import psutil
import time
import requests
from dataclasses import dataclass
from datetime import datetime
import threading
import json
from typing import Dict, List, Callable
@dataclass
class MetricThreshold:
metric_name: str
warning_threshold: float
critical_threshold: float
comparison: str = "greater_than" # greater_than, less_than, equals
@dataclass
class PipelineMetrics:
timestamp: datetime
cpu_usage: float
memory_usage: float
disk_usage: float
network_io: Dict[str, int]
processing_rate: float
error_rate: float
queue_depth: int
latency_p95: float
class PipelineMonitor:
def __init__(self, alert_handlers: List[Callable] = None):
self.thresholds = []
self.metrics_history = []
self.alert_handlers = alert_handlers or []
self.monitoring = False
def add_threshold(self, threshold: MetricThreshold):
"""Add monitoring threshold"""
self.thresholds.append(threshold)
def add_alert_handler(self, handler: Callable):
"""Add alert handler function"""
self.alert_handlers.append(handler)
def collect_system_metrics(self) -> PipelineMetrics:
"""Collect system and pipeline metrics"""
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
network = psutil.net_io_counters()
# Pipeline-specific metrics (these would come from your pipeline)
processing_rate = self.get_processing_rate() # records/second
error_rate = self.get_error_rate() # errors/minute
queue_depth = self.get_queue_depth() # pending items
latency_p95 = self.get_latency_p95() # milliseconds
return PipelineMetrics(
timestamp=datetime.utcnow(),
cpu_usage=cpu_percent,
memory_usage=memory.percent,
disk_usage=(disk.used / disk.total) * 100,
network_io={"bytes_sent": network.bytes_sent, "bytes_recv": network.bytes_recv},
processing_rate=processing_rate,
error_rate=error_rate,
queue_depth=queue_depth,
latency_p95=latency_p95
)
def check_thresholds(self, metrics: PipelineMetrics):
"""Check metrics against thresholds and trigger alerts"""
metric_values = {
"cpu_usage": metrics.cpu_usage,
"memory_usage": metrics.memory_usage,
"disk_usage": metrics.disk_usage,
"processing_rate": metrics.processing_rate,
"error_rate": metrics.error_rate,
"queue_depth": metrics.queue_depth,
"latency_p95": metrics.latency_p95
}
for threshold in self.thresholds:
metric_value = metric_values.get(threshold.metric_name)
if metric_value is None:
continue
alert_level = None
if threshold.comparison == "greater_than":
if metric_value >= threshold.critical_threshold:
alert_level = "CRITICAL"
elif metric_value >= threshold.warning_threshold:
alert_level = "WARNING"
elif threshold.comparison == "less_than":
if metric_value <= threshold.critical_threshold:
alert_level = "CRITICAL"
elif metric_value <= threshold.warning_threshold:
alert_level = "WARNING"
if alert_level:
self.send_alert(alert_level, threshold.metric_name, metric_value, threshold)
def send_alert(self, level: str, metric_name: str, value: float, threshold: MetricThreshold):
"""Send alert through configured handlers"""
alert_data = {
"level": level,
"metric": metric_name,
"value": value,
"threshold": threshold.warning_threshold if level == "WARNING" else threshold.critical_threshold,
"timestamp": datetime.utcnow().isoformat(),
"pipeline": "data_processing_pipeline"
}
for handler in self.alert_handlers:
try:
handler(alert_data)
except Exception as e:
print(f"Error sending alert: {e}")
def start_monitoring(self, interval: int = 60):
"""Start continuous monitoring"""
self.monitoring = True
def monitor_loop():
while self.monitoring:
try:
metrics = self.collect_system_metrics()
self.metrics_history.append(metrics)
# Keep only last 1000 metrics
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
self.check_thresholds(metrics)
time.sleep(interval)
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(interval)
monitor_thread = threading.Thread(target=monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def stop_monitoring(self):
"""Stop monitoring"""
self.monitoring = False
def get_processing_rate(self) -> float:
"""Get current processing rate - implement based on your pipeline"""
# This would integrate with your actual pipeline metrics
return 100.0 # records per second
def get_error_rate(self) -> float:
"""Get current error rate - implement based on your pipeline"""
# This would integrate with your actual pipeline metrics
return 0.5 # errors per minute
def get_queue_depth(self) -> int:
"""Get current queue depth - implement based on your pipeline"""
# This would integrate with your actual pipeline metrics
return 50 # pending items
def get_latency_p95(self) -> float:
"""Get 95th percentile latency - implement based on your pipeline"""
# This would integrate with your actual pipeline metrics
return 250.0 # milliseconds
# Alert handlers
def slack_alert_handler(alert_data: Dict):
"""Send alert to Slack"""
webhook_url = "YOUR_SLACK_WEBHOOK_URL"
color = "#ff0000" if alert_data["level"] == "CRITICAL" else "#ffaa00"
message = {
"attachments": [{
"color": color,
"title": f"{alert_data['level']} Alert: {alert_data['metric']}",
"text": f"Value: {alert_data['value']:.2f} (Threshold: {alert_data['threshold']:.2f})",
"fields": [
{"title": "Pipeline", "value": alert_data['pipeline'], "short": True},
{"title": "Timestamp", "value": alert_data['timestamp'], "short": True}
]
}]
}
requests.post(webhook_url, json=message)
def email_alert_handler(alert_data: Dict):
"""Send alert via email"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Email configuration
smtp_server = "smtp.gmail.com"
smtp_port = 587
sender_email = "alerts@company.com"
sender_password = "app_password"
recipient_emails = ["team@company.com"]
# Create message
msg = MIMEMultipart()
msg["From"] = sender_email
msg["To"] = ", ".join(recipient_emails)
msg["Subject"] = f"{alert_data['level']} Alert: {alert_data['metric']}"
body = f"""
Alert Details:
- Level: {alert_data['level']}
- Metric: {alert_data['metric']}
- Current Value: {alert_data['value']:.2f}
- Threshold: {alert_data['threshold']:.2f}
- Pipeline: {alert_data['pipeline']}
- Timestamp: {alert_data['timestamp']}
Please investigate immediately.
"""
msg.attach(MIMEText(body, "plain"))
# Send email
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
server.quit()
except Exception as e:
print(f"Failed to send email alert: {e}")
def pagerduty_alert_handler(alert_data: Dict):
"""Send alert to PagerDuty"""
integration_key = "YOUR_PAGERDUTY_INTEGRATION_KEY"
payload = {
"routing_key": integration_key,
"event_action": "trigger",
"payload": {
"summary": f"{alert_data['level']} Alert: {alert_data['metric']}",
"severity": "critical" if alert_data["level"] == "CRITICAL" else "warning",
"source": alert_data['pipeline'],
"component": alert_data['metric'],
"custom_details": alert_data
}
}
response = requests.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
headers={"Content-Type": "application/json"}
)
if response.status_code != 202:
print(f"Failed to send PagerDuty alert: {response.text}")
# Setup monitoring for data pipeline
def setup_pipeline_monitoring():
"""Setup comprehensive pipeline monitoring"""
monitor = PipelineMonitor([
slack_alert_handler,
email_alert_handler,
pagerduty_alert_handler
])
# Add monitoring thresholds
monitor.add_threshold(MetricThreshold("cpu_usage", 70, 90))
monitor.add_threshold(MetricThreshold("memory_usage", 80, 95))
monitor.add_threshold(MetricThreshold("disk_usage", 85, 95))
monitor.add_threshold(MetricThreshold("error_rate", 5, 10))
monitor.add_threshold(MetricThreshold("queue_depth", 1000, 5000))
monitor.add_threshold(MetricThreshold("latency_p95", 1000, 5000))
monitor.add_threshold(MetricThreshold("processing_rate", 50, 10, "less_than"))
# Start monitoring
monitor.start_monitoring(interval=60) # Check every minute
return monitor
# Start monitoring
pipeline_monitor = setup_pipeline_monitoring()

Conclusion#

Building scalable data pipelines requires careful consideration of architecture, technology choices, and operational practices. Modern ETL frameworks provide powerful capabilities for handling diverse data sources and processing requirements, but success depends on implementing robust monitoring, quality controls, and error handling.

Key takeaways for successful data pipeline implementation:

  1. Start with clear requirements - Understand your data sources, processing needs, and SLA requirements
  2. Design for scalability - Use distributed processing frameworks and cloud-native services
  3. Implement comprehensive monitoring - Monitor both technical metrics and business KPIs
  4. Ensure data quality - Build quality checks into every stage of your pipeline
  5. Plan for failure - Implement robust error handling and recovery mechanisms
  6. Document everything - Maintain clear documentation for data lineage and pipeline logic

Remember that while the technical implementation is important, understanding the business context and requirements that drive these decisions—such as the specific challenges that 051nt} represent in data processing scenarios—is equally crucial for building effective, enterprise-grade data pipelines.

By following these practices and continuously iterating on your pipeline design, you’ll be able to build robust, scalable data processing systems that can evolve with your organization’s growing data needs.

Building Scalable Data Pipelines with Modern ETL Frameworks
https://antonio-roth.icanse.eu.org/posts/scalable-data-pipelines-etl/
Author
Antonio Roth
Published at
2025-08-29
License
CC BY-NC-SA 4.0