Ingesting Massive Data Loads: Patterns for High-Performance Batch Pipelines
Moving data from source systems into your lake or warehouse sounds simple until you're doing it at scale. A pipeline that works fine at 10M rows starts breaking at 1B — queries time out, storage costs spike, and the pipeline window that should take 2 hours starts taking 14.
This post covers the patterns that separate pipelines that scale from pipelines that collapse under their own weight.
The Core Problem: Naive Ingestion Doesn't Scale
The default approach — SELECT * FROM source_table and write to destination — fails in three ways at scale:
- Memory pressure — loading an entire table into memory to move it is unnecessary and expensive
- No parallelism — a single sequential read/write uses a fraction of available compute
- No incrementalism — re-processing data that hasn't changed wastes both time and money
Fixing these three issues is the foundation of a high-performance ingestion pipeline.
Pattern 1: Incremental Ingestion
Never move data you've already moved. Incremental ingestion tracks a watermark — typically a updated_at timestamp or an auto-incrementing ID — and only extracts rows that changed since the last run.
import duckdb
from datetime import datetime
def incremental_extract(
source_conn,
table: str,
watermark_col: str,
last_watermark: datetime,
) -> str:
"""Extract only rows updated since last run. Returns path to parquet output."""
query = f"""
COPY (
SELECT *
FROM {table}
WHERE {watermark_col} > '{last_watermark.isoformat()}'
ORDER BY {watermark_col}
)
TO '/tmp/{table}_{last_watermark.date()}.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
"""
source_conn.execute(query)
return f'/tmp/{table}_{last_watermark.date()}.parquet'
Caveats to handle:
- Late-arriving data — rows that arrive after their event time require a lookback window (e.g., always re-process the last 3 days)
- Soft deletes — if your source system deletes rows, a watermark approach misses them; use a CDC approach instead
- Schema changes — detect and handle new or renamed columns before they silently corrupt downstream tables
Pattern 2: Partition-Parallel Loading
Process multiple partitions simultaneously instead of one sequential pass.
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
import boto3
def parallel_partition_load(
partitions: list[str], # e.g. ['2025-09-01', '2025-09-02', ...]
extract_fn: Callable,
load_fn: Callable,
max_workers: int = 8,
) -> dict:
results = {"success": [], "failed": []}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(lambda p: load_fn(extract_fn(p)), partition): partition
for partition in partitions
}
for future in as_completed(futures):
partition = futures[future]
try:
future.result()
results["success"].append(partition)
except Exception as e:
results["failed"].append({"partition": partition, "error": str(e)})
return results
Tuning max_workers:
- For I/O-bound workloads (network, disk):
max_workers = 2 × CPU cores - For CPU-bound transforms:
max_workers = CPU cores - For database source connections: limit by connection pool size, not CPU
Pattern 3: File Format and Compression
The choice of output format has an outsized impact on both ingestion speed and downstream query performance.
| Format | Write speed | Query speed | Compression | Best for |
|---|---|---|---|---|
| CSV | Fast | Slow | None | Small files, debugging |
| JSON/NDJSON | Medium | Slow | Poor | Semi-structured |
| Parquet | Medium | Fast | Excellent | Analytics, warehouses |
| Avro | Fast | Medium | Good | Streaming, schema evolution |
| ORC | Medium | Fast | Excellent | Hive-heavy stacks |
For batch ingestion into a data lake or warehouse, Parquet with ZSTD compression is the default choice. ZSTD compresses 20–40% better than Snappy with similar decompression speed.
import pyarrow as pa
import pyarrow.parquet as pq
def write_parquet_optimized(df, output_path: str, partition_cols: list[str] = None):
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path=output_path,
partition_cols=partition_cols, # creates directory partitions
compression='zstd',
use_dictionary=True, # column-level dictionary encoding
write_statistics=True, # enables partition pruning at query time
row_group_size=500_000, # tune for your query patterns
)
write_statistics=True is critical — it stores min/max values per row group, allowing query engines to skip entire file sections without reading them.
Pattern 4: Dynamic Partitioning Strategy
How you partition data at rest determines how fast queries run later. Poor partitioning is the single most common cause of slow warehouse queries.
Rules of thumb:
- Partition on the column most commonly used in
WHEREclauses (usually a date) - Target 128MB–1GB per partition file — too small means too many files (metadata overhead), too large means no parallelism
- For time-series data: daily partitions for recent data, monthly for historical
# Good partition layout for event data
s3://my-lake/events/
year=2025/month=09/day=15/
part-0001.parquet (400MB)
part-0002.parquet (380MB)
year=2025/month=09/day=14/
part-0001.parquet (420MB)
# Bad: too many tiny files (the "small file problem")
s3://my-lake/events/
2025-09-15-00:00:01.parquet (2MB)
2025-09-15-00:00:02.parquet (1.8MB)
... (10,000 more files)
The small file problem kills performance. Query engines spend more time opening files than reading data. Compaction jobs — merging small files into target-size files — should run as part of your pipeline.
Pattern 5: Staging → Swap for Zero-Downtime Loads
Never write directly to the production table during a load. Write to a staging location, validate, then swap atomically.
-- In Snowflake / most warehouses
BEGIN;
-- Load into staging table
COPY INTO my_table_staging
FROM @my_stage/data/
FILE_FORMAT = (TYPE = 'PARQUET');
-- Validate row counts before committing
SELECT COUNT(*) FROM my_table_staging; -- check in application code
-- Atomic swap
ALTER TABLE my_table SWAP WITH my_table_staging;
COMMIT;
If the load or validation fails, the production table is untouched. The swap is atomic — queries hitting my_table during the load see either the old or the new version, never a partial state.
Airflow DAG: Putting It Together
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule='0 2 * * *',
start_date=datetime(2025, 9, 1),
catchup=False,
tags=['ingestion', 'batch'],
)
def daily_batch_ingestion():
@task
def get_partitions_to_process() -> list[str]:
# Return yesterday + 3-day lookback for late arrivals
today = datetime.utcnow().date()
return [(today - timedelta(days=i)).isoformat() for i in range(1, 5)]
@task
def extract_and_stage(partitions: list[str]) -> str:
return parallel_partition_load(
partitions=partitions,
extract_fn=incremental_extract,
load_fn=write_parquet_optimized,
max_workers=8,
)
@task
def validate_and_swap(staging_result: dict) -> None:
if staging_result["failed"]:
raise ValueError(f"Failed partitions: {staging_result['failed']}")
atomic_swap_to_production()
@task
def compact_small_files() -> None:
run_compaction_job()
partitions = get_partitions_to_process()
staged = extract_and_stage(partitions)
swapped = validate_and_swap(staged)
compact_small_files()
daily_batch_ingestion()
Monitoring What Matters
Track these metrics per pipeline run:
- Rows processed / second — your throughput baseline
- Bytes written — catch compression regressions
- File count — alert if small file count exceeds threshold
- Partition skew — some partitions 10× larger than others indicates a bad partition key
- Watermark lag — how far behind is your latest ingested data vs. source?