Skip to main content

Ingesting Massive Data Loads: Patterns for High-Performance Batch Pipelines

· 6 min read
Metadata Morph
AI & Data Engineering Team

Moving data from source systems into your lake or warehouse sounds simple until you're doing it at scale. A pipeline that works fine at 10M rows starts breaking at 1B — queries time out, storage costs spike, and the pipeline window that should take 2 hours starts taking 14.

This post covers the patterns that separate pipelines that scale from pipelines that collapse under their own weight.

The Core Problem: Naive Ingestion Doesn't Scale

The default approach — SELECT * FROM source_table and write to destination — fails in three ways at scale:

  1. Memory pressure — loading an entire table into memory to move it is unnecessary and expensive
  2. No parallelism — a single sequential read/write uses a fraction of available compute
  3. No incrementalism — re-processing data that hasn't changed wastes both time and money

Fixing these three issues is the foundation of a high-performance ingestion pipeline.

Pattern 1: Incremental Ingestion

Never move data you've already moved. Incremental ingestion tracks a watermark — typically a updated_at timestamp or an auto-incrementing ID — and only extracts rows that changed since the last run.

import duckdb
from datetime import datetime

def incremental_extract(
source_conn,
table: str,
watermark_col: str,
last_watermark: datetime,
) -> str:
"""Extract only rows updated since last run. Returns path to parquet output."""

query = f"""
COPY (
SELECT *
FROM {table}
WHERE {watermark_col} > '{last_watermark.isoformat()}'
ORDER BY {watermark_col}
)
TO '/tmp/{table}_{last_watermark.date()}.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
"""
source_conn.execute(query)
return f'/tmp/{table}_{last_watermark.date()}.parquet'

Caveats to handle:

  • Late-arriving data — rows that arrive after their event time require a lookback window (e.g., always re-process the last 3 days)
  • Soft deletes — if your source system deletes rows, a watermark approach misses them; use a CDC approach instead
  • Schema changes — detect and handle new or renamed columns before they silently corrupt downstream tables

Pattern 2: Partition-Parallel Loading

Process multiple partitions simultaneously instead of one sequential pass.

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
import boto3

def parallel_partition_load(
partitions: list[str], # e.g. ['2025-09-01', '2025-09-02', ...]
extract_fn: Callable,
load_fn: Callable,
max_workers: int = 8,
) -> dict:
results = {"success": [], "failed": []}

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(lambda p: load_fn(extract_fn(p)), partition): partition
for partition in partitions
}
for future in as_completed(futures):
partition = futures[future]
try:
future.result()
results["success"].append(partition)
except Exception as e:
results["failed"].append({"partition": partition, "error": str(e)})

return results

Tuning max_workers:

  • For I/O-bound workloads (network, disk): max_workers = 2 × CPU cores
  • For CPU-bound transforms: max_workers = CPU cores
  • For database source connections: limit by connection pool size, not CPU

Pattern 3: File Format and Compression

The choice of output format has an outsized impact on both ingestion speed and downstream query performance.

FormatWrite speedQuery speedCompressionBest for
CSVFastSlowNoneSmall files, debugging
JSON/NDJSONMediumSlowPoorSemi-structured
ParquetMediumFastExcellentAnalytics, warehouses
AvroFastMediumGoodStreaming, schema evolution
ORCMediumFastExcellentHive-heavy stacks

For batch ingestion into a data lake or warehouse, Parquet with ZSTD compression is the default choice. ZSTD compresses 20–40% better than Snappy with similar decompression speed.

import pyarrow as pa
import pyarrow.parquet as pq

def write_parquet_optimized(df, output_path: str, partition_cols: list[str] = None):
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=output_path,
partition_cols=partition_cols, # creates directory partitions
compression='zstd',
use_dictionary=True, # column-level dictionary encoding
write_statistics=True, # enables partition pruning at query time
row_group_size=500_000, # tune for your query patterns
)

write_statistics=True is critical — it stores min/max values per row group, allowing query engines to skip entire file sections without reading them.

Pattern 4: Dynamic Partitioning Strategy

How you partition data at rest determines how fast queries run later. Poor partitioning is the single most common cause of slow warehouse queries.

Rules of thumb:

  • Partition on the column most commonly used in WHERE clauses (usually a date)
  • Target 128MB–1GB per partition file — too small means too many files (metadata overhead), too large means no parallelism
  • For time-series data: daily partitions for recent data, monthly for historical
# Good partition layout for event data
s3://my-lake/events/
year=2025/month=09/day=15/
part-0001.parquet (400MB)
part-0002.parquet (380MB)
year=2025/month=09/day=14/
part-0001.parquet (420MB)

# Bad: too many tiny files (the "small file problem")
s3://my-lake/events/
2025-09-15-00:00:01.parquet (2MB)
2025-09-15-00:00:02.parquet (1.8MB)
... (10,000 more files)

The small file problem kills performance. Query engines spend more time opening files than reading data. Compaction jobs — merging small files into target-size files — should run as part of your pipeline.

Pattern 5: Staging → Swap for Zero-Downtime Loads

Never write directly to the production table during a load. Write to a staging location, validate, then swap atomically.

-- In Snowflake / most warehouses
BEGIN;

-- Load into staging table
COPY INTO my_table_staging
FROM @my_stage/data/
FILE_FORMAT = (TYPE = 'PARQUET');

-- Validate row counts before committing
SELECT COUNT(*) FROM my_table_staging; -- check in application code

-- Atomic swap
ALTER TABLE my_table SWAP WITH my_table_staging;

COMMIT;

If the load or validation fails, the production table is untouched. The swap is atomic — queries hitting my_table during the load see either the old or the new version, never a partial state.

Airflow DAG: Putting It Together

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
schedule='0 2 * * *',
start_date=datetime(2025, 9, 1),
catchup=False,
tags=['ingestion', 'batch'],
)
def daily_batch_ingestion():

@task
def get_partitions_to_process() -> list[str]:
# Return yesterday + 3-day lookback for late arrivals
today = datetime.utcnow().date()
return [(today - timedelta(days=i)).isoformat() for i in range(1, 5)]

@task
def extract_and_stage(partitions: list[str]) -> str:
return parallel_partition_load(
partitions=partitions,
extract_fn=incremental_extract,
load_fn=write_parquet_optimized,
max_workers=8,
)

@task
def validate_and_swap(staging_result: dict) -> None:
if staging_result["failed"]:
raise ValueError(f"Failed partitions: {staging_result['failed']}")
atomic_swap_to_production()

@task
def compact_small_files() -> None:
run_compaction_job()

partitions = get_partitions_to_process()
staged = extract_and_stage(partitions)
swapped = validate_and_swap(staged)
compact_small_files()

daily_batch_ingestion()

Monitoring What Matters

Track these metrics per pipeline run:

  • Rows processed / second — your throughput baseline
  • Bytes written — catch compression regressions
  • File count — alert if small file count exceeds threshold
  • Partition skew — some partitions 10× larger than others indicates a bad partition key
  • Watermark lag — how far behind is your latest ingested data vs. source?
Book a strategy session to design your ingestion pipeline.