Your pipeline failed at 3am. By 3:15am, the auto-retry kicked in and the run succeeded. You get a Slack notification in the morning: "Pipeline orders_daily retry succeeded." You check the row count in Snowflake. It's double what you expected.
This is the idempotency failure. The original run partially completed before it failed — some rows made it to the destination. The retry ran the full batch again from the start. Now you have two copies of everything that was written before the failure.
Idempotency is the property that running a pipeline multiple times with the same input produces the same output as running it once. An idempotent pipeline can be retried safely without accumulating duplicates or producing inconsistent state. This article is a systematic treatment of how to achieve it.
Why idempotency is harder than it looks
The naive fix is "just use UPSERT instead of INSERT." That's part of the answer, but it doesn't cover all failure modes. Consider:
- What if the source doesn't have a stable primary key?
- What if the failure occurred mid-batch, after some records were committed to the destination but before the cursor was updated in the orchestrator's state store?
- What if your "incremental" pipeline uses a
last_updated_atwatermark, and the watermark was advanced before the batch completed? - What if the destination is an append-only table (like event tables in a warehouse) and UPSERT semantics don't apply?
- What if the pipeline schema drifted mid-run and some rows landed with the old structure and some with the new?
Idempotency in practice requires thinking through each of these cases for each pipeline. There is no universal fix, but there are patterns that address each scenario reliably.
Pattern 1: Use a stable, source-derived idempotency key
The foundation of idempotent writes is a stable key that uniquely identifies each logical record. When the same record arrives twice (on retry, or due to at-least-once delivery semantics from the source), the destination recognizes it as a duplicate and either ignores it or updates in place.
The key must come from the source, not from your pipeline. Auto-generated keys (UUID generated at ingest time, auto-increment ID generated at the destination) are not idempotent — each run generates a different key for the same source record.
For relational sources: use the source table's primary key as the destination's merge key. For event streams: use the event's native event ID or a composite key of (event_type, producer_id, event_timestamp) if there's no native ID. For API sources: use the resource's API-level ID (Stripe charge ID, Salesforce opportunity ID, etc.).
In practice, this looks like a Snowflake MERGE INTO:
MERGE INTO raw.orders AS target
USING staging.orders_batch AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
total_value = source.total_value,
status = source.status,
updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT (
order_id, total_value, status, created_at, updated_at
) VALUES (
source.order_id, source.total_value, source.status,
source.created_at, source.updated_at
);
Running this MERGE INTO twice with the same batch produces the same output. The second run finds the rows already present and updates them to the same values. Net change: zero.
A pitfall to know: MERGE INTO on Snowflake is not safe when your source batch contains duplicate rows for the same merge key. If staging.orders_batch has two rows with order_id = 'ORD-4821', Snowflake will throw a non-deterministic merge error. Add a deduplication step in the staging query before the MERGE:
MERGE INTO raw.orders AS target
USING (
SELECT * FROM staging.orders_batch
QUALIFY ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) = 1
) AS source
ON target.order_id = source.order_id
...
The QUALIFY ROW_NUMBER() pattern keeps the latest version of each key within the batch. This handles the case where a source emits two updates for the same record within the same batch window.
Pattern 2: Write to a staging table first, then merge
Writing directly to the destination table during a batch run creates a partial-write problem: if the run fails mid-batch, the destination table has an inconsistent mix of old and new data. The next run either re-inserts the same rows (duplicates) or over-writes them (depends on your merge key).
The safer pattern: write the entire batch to a staging table first. Once the staging table is complete and verified, merge it into the destination in a single transaction. If the staging write fails, no changes have been made to the destination table. The staging table is truncated and the run can be retried from scratch.
-- Step 1: Truncate and reload staging
TRUNCATE TABLE staging.orders_batch;
COPY INTO staging.orders_batch FROM @source_stage/batch_2025-06-30_003.csv;
-- Step 2: Verify row count matches source manifest
SELECT COUNT(*) FROM staging.orders_batch; -- assert equals expected_count
-- Step 3: Merge into destination
MERGE INTO raw.orders AS target
USING (
SELECT * FROM staging.orders_batch
QUALIFY ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) = 1
) AS source
ON target.order_id = source.order_id
...
The staging table is effectively a write buffer. Failures during step 1 don't affect the destination. The merge in step 3 is atomic in most warehouse systems (Snowflake, BigQuery, Redshift all support atomic MERGE or equivalent). The row count assertion in step 2 is the guard against partial staging loads — if the COPY INTO was truncated partway, the assertion fails before the merge runs.
Pattern 3: Partition isolation for incremental pipelines
Incremental pipelines are the hardest to make idempotent because they depend on state: "give me everything that changed since the last successful run." The failure mode: if you advance the watermark before writing all the rows from the window, and the write fails, the next run starts from the advanced watermark and misses the failed window entirely. The data gap is invisible — no errors, no alerts, just missing rows.
The fix: don't advance the watermark until the write is confirmed complete. The watermark update and the write confirmation must be ordered — the watermark update must happen only after the write. In practice, this means:
- The orchestrator's task that advances the watermark must not run if the write task failed (explicit
depends_on) - The watermark is stored in a durable state store (database table, not in-memory), so a process restart doesn't lose it
- On retry, the pipeline reads the current watermark from the state store, not from a variable set earlier in the same run
A clean pattern for this in YAML-based orchestrators like Queryvine:
tasks:
- id: read_window
type: extract
watermark: "{{ read_state('orders_pipeline.last_run_at') }}"
- id: write_to_staging
type: load
depends_on: [read_window]
destination: staging.orders_batch
- id: merge_to_destination
type: transform
depends_on: [write_to_staging]
sql: "MERGE INTO raw.orders ..."
- id: advance_watermark
type: state_update
depends_on: [merge_to_destination]
key: orders_pipeline.last_run_at
value: "{{ now() }}"
The advance_watermark task only runs if merge_to_destination succeeds. If the merge fails, the watermark stays at its previous value, and the next run picks up the same window again. The qv run start orders_pipeline --dry-run flag will show you the watermark that would be used without executing, useful for verifying state after a failed run.
A subtlety here: if your source system can backdate records (e.g., Salesforce opportunities with close dates modified retroactively, financial transactions with value dates), a strict "last N hours" watermark will miss those corrections on future runs. For sources with late-arriving updates, use an overlap window — refresh the last 7 days even in incremental mode — so corrections within that window are picked up without a full refresh.
Pattern 4: Partition-keyed full refresh for small tables
For small tables (under ~1M rows) where incremental complexity isn't worth the engineering cost, full refresh is often the simpler path to idempotency. Full refresh: truncate the destination table and reload it entirely from the source on every run.
Full refresh is naturally idempotent — running it twice produces the same destination table. The trade-off is that it's expensive at scale and creates a window during the truncate+reload where the table is empty or partially populated.
The standard mitigation is a blue/green table swap. Reload into a temporary table, then swap the table name atomically:
-- Load to temp
TRUNCATE TABLE raw.orders_temp;
INSERT INTO raw.orders_temp SELECT * FROM source;
-- Atomic swap
ALTER TABLE raw.orders RENAME TO raw.orders_old;
ALTER TABLE raw.orders_temp RENAME TO raw.orders;
DROP TABLE raw.orders_old;
The swap is atomic in most warehouse systems. Downstream queries that hit raw.orders never see a partially populated table — they see either the old version or the new version, never a mix. The raw.orders_old drop is safe to delay; if the swap succeeded, you can clean up on the next run.
One important limit: this pattern does not work cleanly in Redshift when downstream materialized views or late-binding views reference raw.orders by name with an explicit schema binding. Those views need to be rebuilt after the rename. BigQuery handles this more gracefully with its table swap semantics.
Pattern 5: Idempotency keys for event streams
Event streams (Kafka, Kinesis) deliver at-least-once semantics by default. A consumer failure followed by a restart will re-process messages from the last committed offset. If your consumer wrote some of those messages to the destination before the failure, you'll see duplicates on replay.
The standard approach: use a deduplication step at the destination. In Snowflake, this might be a QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY processed_at DESC) = 1 in the destination model. In BigQuery, partitioned tables with deduplication logic in the scheduled query that materializes the final table.
Alternatively: if your event producer generates stable event IDs (UUIDs assigned at event creation, not at consumption), use those as the merge key in a MERGE-based write. Events with the same event ID that arrive twice on replay will merge to the same row.
The fragile alternative is using the Kafka partition offset as the idempotency key. This works if your consumer is the only writer to the destination and offsets are stable. It breaks if you ever have to reset consumer offsets or replay from a different offset — which is exactly what you need to do during disaster recovery, backfill operations, or environment testing. Offsets are not stable across topic recreations or partition rebalancing scenarios.
Pattern 6: Checkpoint recovery for long-running backfills
For pipelines that process large historical backfills (replaying years of data, migrating from one warehouse to another), the run may take hours. If a failure occurs in hour 3 of a 6-hour backfill, you don't want to restart from the beginning.
The solution is checkpoint-based recovery: periodically write progress markers to a durable store. On restart, resume from the last checkpoint rather than the beginning.
Checkpoints should be at logical boundaries, not time-based. For a backfill processing data by date partition, checkpoint after each date partition completes. The checkpoint record should include: the last completed partition, the row count written, and a count of the source rows for that partition. On resume, verify the count against the checkpoint before advancing.
This is conceptually similar to how write-ahead logging works in distributed databases — a durable record of committed state that allows the system to reconstruct progress from any failure point. The key requirement is that checkpoint writes are durable before the next partition begins. An in-memory checkpoint that's lost on process crash provides no recovery guarantee.
Where idempotency and schema drift intersect
There is a scenario that bridges both concerns: a pipeline where a schema change occurs mid-backfill. Imagine a 7-day backfill that starts with the source schema as of Monday, processes 4 days of data, then encounters a schema change (column rename) at the Thursday partition. The rows for Monday through Wednesday landed correctly. The rows for Thursday onward have the renamed column, which maps to NULLs at the destination.
If you checkpoint and resume after fixing the schema issue, your checkpointed partitions (Mon–Wed) won't be re-processed — they're marked complete. But if your schema fix required a schema migration at the destination (adding a new column, renaming a column in the destination table), the Mon–Wed rows that already landed may need to be backfilled for the new column as well.
We're not saying schema drift detection eliminates this problem — it doesn't. What drift detection at ingest time does is prevent the split-schema data from landing in the first place. If Queryvine's schema check fires at the Thursday partition before any rows from Thursday move, you get a clean pause point: Mon–Wed are intact, Thu–Sun haven't been touched yet. The resume is straightforward after the schema fix. Without pre-ingest detection, you get a corrupted mixed-state table that's harder to reason about.
The verification test
Before declaring a pipeline idempotent, run this test: take a pipeline that ran successfully yesterday. Trigger it again with the same inputs (same watermark window, same source data). Compare the destination row count before and after the second run. If the count changed, the pipeline is not idempotent.
For pipelines that use MERGE INTO, also check that the row count in key numeric columns (total revenue, event counts, etc.) doesn't change. A MERGE that produces duplicate rows for the same key with different values is idempotent on row count but not on aggregate values — which is often the more important guarantee for analytics pipelines.
Most teams discover idempotency failures in production, not in testing. The test above takes 5 minutes to run and will surface the failures before they cause Monday morning data incidents.