ETL Script Example
This section documents a complete ETL script example in Python using Watcher and the Polygon API to extract stock market data. For Production code implementation, see the Recommended Implementation guide.
Note
This example is intentionally verbose for demonstrative purposes. In production implementations, consider proper coding practices such as:
Configuration management (environment variables, config files)
Error handling and logging frameworks
Connection pooling and retry mechanisms
Data validation and schema enforcement
Code organization and modularity (configurable code rather than one-off ETL scripts)
Overview
This example demonstrates a real-world-esque ETL pipeline that:
Extracts daily stock open/close data from the Polygon API
Uses Watcher for pipeline execution tracking and metadata management
Handles rate limiting and error scenarios
Implements proper watermark management for incremental processing
Creates address lineage relationships for data flow tracking
I created an article series that slowly builds up to this script to explain the features of the Watcher framework.
ETL Example Script
Here’s a complete ETL example script:
import time
import httpx
import pendulum
pipeline = {
"name": "polygon_open_close",
"pipeline_type_name": "extraction",
"next_watermark": pendulum.now().date().to_date_string(),
}
response = httpx.post("http://localhost:8000/pipeline", json=pipeline)
pipeline_id = response.json()["id"]
watermark = response.json()["watermark"]
print("Watermark:", watermark)
load_lineage = response.json()["load_lineage"]
default_watermark = pendulum.date(2025, 1, 1)
if watermark is None:
watermark = default_watermark
else:
watermark = pendulum.parse(watermark).date()
if load_lineage:
lineage = {
"pipeline_id": pipeline_id,
"source_addresses": [
{
"name": "https://api.polygon.io/v1/open-close/",
"address_type_name": "polygon",
"address_type_group_name": "api",
}
],
"target_addresses": [
{
"name": "prod.polygon.open_close",
"address_type_name": "postgres",
"address_type_group_name": "database",
}
],
}
httpx.post("http://localhost:8000/address_lineage", json=lineage)
start_execution = {
"pipeline_id": pipeline_id,
"start_time": pendulum.now().isoformat(),
}
response = httpx.post("http://localhost:8000/start_pipeline_execution", json=start_execution)
pipeline_execution_id = response.json()["id"]
params = {
"apiKey": "XXXX",
}
all_records = []
try:
while watermark < current_date:
response = httpx.get(
f"https://api.polygon.io/v1/open-close/AAPL/{watermark}", params=params
)
record = response.json()
if response.status_code == 429:
wait_time = 30
print(f"Rate limit exceeded. Waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
if record["status"] == "OK":
all_records.append(record)
watermark = watermark.add(days=1)
print(all_records) # Save records somewhere
end_execution = {
"id": pipeline_execution_id,
"pipeline_id": pipeline_id,
"end_date": pendulum.now().isoformat(),
"completed_successfully": True,
"total_rows": len(all_records),
}
httpx.post("http://localhost:8000/end_pipeline_execution", json=end_execution)
except Exception as e:
end_execution = {
"id": pipeline_execution_id,
"pipeline_id": pipeline_id,
"end_date": pendulum.now().isoformat(),
"completed_successfully": False,
}
httpx.post("http://localhost:8000/end_pipeline_execution", json=end_execution)
raise e
Key Features Demonstrated
Pipeline Management:
Get-or-Create Pattern: No separate creation calls needed; pipeline and pipeline_type are automatically created if they don’t exist
Watermark Processing: Uses watermark-based incremental processing for efficient data extraction
Easy Deployment: Same code works for first run and subsequent runs without modification
Address Lineage:
Creates source-to-target data lineage relationships
Uses proper address naming conventions (API endpoint and database table)
Automatically creates address types (polygon API, postgres database)
Execution Tracking:
Proper start/end execution pattern with error handling
Tracks execution metrics (total_rows processed)
Handles both successful and failed execution scenarios
Watermark Management:
Uses date-based watermarks for incremental processing
Handles initial watermark setup for new pipelines
Advances watermark through date range processing