Watermark Management
This guide covers how to effectively manage watermarks in Watcher.
Understanding Watermarks
Watermarks track the progress of data processing in incremental data pipelines:
watermark Current position (where processing has reached)
next_watermark Target position (where processing should go to)
Key Features:
Incremental Processing: Support for watermark-based incremental data pipelines
Flexible Watermarking: Use any identifier (IDs, timestamps, etc.) as watermarks
Automatic Updates: Watermarks are automatically updated after successful pipeline execution
Watermark Patterns
Watermarks are stored as strings in the database. This allows for flexibility in the watermark format.
Daily Processing
{
"watermark": "2024-01-01T00:00:00Z",
"next_watermark": "2024-01-01T23:59:59Z"
}
Hourly Processing
{
"watermark": "2024-01-01T10:00:00Z",
"next_watermark": "2024-01-01T11:00:00Z"
}
Numeric Watermarks
{
"watermark": "1000",
"next_watermark": "2000"
}
Watermark Increment Logic
After successful execution, the watermark is automatically updated:
Execution completes successfully
Pipeline watermark becomes next_watermark
Next execution starts from the new watermark
Example Flow:
Execution 1: watermark=0, next_watermark=1000 → Success → watermark=1000
Execution 2: watermark=1000, next_watermark=2000 → Success → watermark=2000
Execution 3: watermark=2000, next_watermark=3000 → Success → watermark=3000
Note
It is critical that your pipeline logic properly handles inclusivity/exclusivity to avoid overlapping watermark values. For example, if using ID-based watermarks, use WHERE id > watermark AND id <= next_watermark to ensure no records are processed twice or missed between executions.
How to Use Watermarks
To properly use watermarks in your pipeline, you need to:
Calculate the next watermark
Use the next_watermark value in your pipeline call
Grab the watermark value from the pipeline response
To account for the first run of the pipeline (watermark None), you’ll need a default starting value
Practical Example
Here’s a complete example of using watermarks in a pipeline:
# Step 1: Get the current watermark and determine next_watermark
next_watermark = 150 # SELECT MAX(id) FROM your_table
# Step 2: Create pipeline with next_watermark
pipeline_data = {
"name": "stock-price-worker",
"pipeline_type_name": "api-integration",
"next_watermark": next_watermark # Converted to string
}
pipeline_response = await client.post("/pipeline", json=pipeline_data)
watermark = pipeline_response.json()['watermark']
if watermark is None:
watermark = 0
else:
watermark = int(watermark)
# Step 3: Start pipeline execution
execution_response = await client.post("/start_pipeline_execution", json={
"pipeline_id": pipeline_result['id'],
"start_date": pendulum.now("UTC").isoformat(),
"watermark": watermark,
"next_watermark": max_id_from_source
})
# Step 4: Use watermarks in your data processing
# Process data using watermark boundaries
SELECT *
FROM Table_A
WHERE id <= next_watermark
AND id > watermark
# Step 5: End execution (watermark automatically updated)
await client.post("/end_pipeline_execution", json={
"id": execution_id,
"end_date": pendulum.now("UTC").isoformat(),
"completed_successfully": True,
"total_rows": 180
})
Important Notes:
Watermarks are automatically updated when end_pipeline_execution is called
The system converts numeric watermarks to strings for storage
Use id > watermark AND id <= next_watermark for proper boundary handling