Recommended Implementation - SDK
This guide covers the recommended implementation using the etl-watcher-sdk Python SDK to integrate your ETL code with Watcher.
Note
The etl-watcher-sdk simplifies the integration process by providing a clean Python API.
See the docs here: https://etl-watcher-sdk.readthedocs.io/en/latest/
Installation
Install the SDK using your preferred package manager:
uv add etl-watcher-sdk
pip install etl-watcher-sdk
poetry add etl-watcher-sdk
Key Processes
Define Pipeline Configuration
Initialize Watcher Client
Sync Pipeline Configuration
Track Pipeline Execution
Define Pipeline Configuration
Store your pipeline configuration in source control using the SDK’s data models:
import pendulum
from watcher import Address, AddressLineage, Pipeline, PipelineConfig
POLYGON_OPEN_CLOSE_PIPELINE_CONFIG = PipelineConfig(
pipeline=Pipeline(
name="polygon_open_close",
pipeline_type_name="extraction",
timeliness_number=20,
timeliness_datepart="minute",
freshness_number=1,
freshness_datepart="day",
pipeline_metadata={
"description": "Daily stock price extraction from Polygon API",
"owner": "data-team",
},
),
address_lineage=AddressLineage(
source_addresses=[
Address(
name="https://api.polygon.io/v1/open-close/",
address_type_name="polygon",
address_type_group_name="api",
)
],
target_addresses=[
Address(
name="prod.polygon.open_close",
address_type_name="postgres",
address_type_group_name="database",
)
],
),
default_watermark=pendulum.date(2025, 10, 1).to_date_string(),
next_watermark=pendulum.now().date().to_date_string(),
)
Initialize Watcher Client
Create a Watcher client instance:
from watcher import Watcher
# Initialize the Watcher client
watcher = Watcher("http://localhost:8000")
Sync Pipeline Configuration
Use the SDK to sync your pipeline configuration and address lineage with Watcher:
# Sync pipeline configuration with Watcher
synced_config = watcher.sync_pipeline_config(POLYGON_OPEN_CLOSE_PIPELINE_CONFIG)
Note
The synchronization also handles watermark management!
Track Pipeline Execution
Use the SDK’s decorator to track your ETL pipeline execution:
from watcher import ETLResult, WatcherContext
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active,
watermark=synced_config.watermark,
next_watermark=synced_config.next_watermark,
)
def extract_data(watcher_context: WatcherContext, tickers: list[str]):
# Access watermark and next_watermark from the context
watermark = pendulum.parse(watcher_context.watermark).date()
next_watermark = pendulum.parse(watcher_context.next_watermark).date()
# Your ETL logic here
total_rows = 0
for ticker in tickers:
# Process each ticker...
total_rows += process_ticker(ticker, watermark, next_watermark)
# Return ETL results
return ETLResult(
completed_successfully=True,
total_rows=total_rows,
)
The decorator automatically handles:
Starting and ending pipeline execution
Updating the pipeline execution with the ETLResult
Active/Inactive pipeline management
All HTTP communication with the Watcher API
And provides access to the watcher_context, allowing for:
Access to the watermark and next_watermark
Access to the pipeline execution context (for hiearchical execution tracking)
Complete Example
Here’s a complete example putting it all together:
import time
import httpx
import pendulum
from watcher import ETLResult, Watcher, WatcherContext
from pipeline import POLYGON_OPEN_CLOSE_PIPELINE_CONFIG
# Initialize Watcher client
watcher = Watcher("http://localhost:8000")
# Sync pipeline configuration
synced_config = watcher.sync_pipeline_config(POLYGON_OPEN_CLOSE_PIPELINE_CONFIG)
# Define your ETL function with tracking
@watcher.track_pipeline_execution(
pipeline_id=synced_config.pipeline.id,
active=synced_config.pipeline.active,
watermark=synced_config.watermark,
next_watermark=synced_config.next_watermark,
)
def extract_data(watcher_context: WatcherContext, tickers: list[str]):
# Access the watcher_context to get the watermark and next_watermark
watermark = pendulum.parse(watcher_context.watermark).date()
next_watermark = pendulum.parse(watcher_context.next_watermark).date()
all_records = []
total_rows = 0
for ticker in tickers:
date = watermark
while date < next_watermark:
response = httpx.get(
f"https://api.polygon.io/v1/open-close/{ticker}/{date}",
params={"apiKey": "YOUR_API_KEY"},
)
if response.status_code == 429:
time.sleep(30) # Rate limit handling
continue
record = response.json()
if record["status"] == "OK":
all_records.append(record)
date = date.add(days=1)
total_rows += len(all_records)
# Make sure to return the ETLResult
return ETLResult(
completed_successfully=True,
total_rows=total_rows,
)
# Execute your ETL pipeline
tickers = ["AAPL", "META"]
extract_data(tickers=tickers)
Benefits of Using the SDK
Simplified Integration: Abstracts away complex HTTP requests and API interactions
Synchronization: Automatically syncs pipeline configuration and address lineage with Watcher
Watermark Management: Automatically manages watermarks for each pipeline execution
Execution Tracking: Automatically tracks pipeline execution and updates the pipeline execution with the ETLResult
Hierarchical Execution Tracking: Provides access to the
watcher_contextfor hierarchical execution tracking
This implementation provides a clean, maintainable way to integrate your ETL processes with the Watcher framework.