Watcher

Getting Started

  • Installation
    • Prerequisites
    • System Requirements
    • Installation Steps
    • Verification
  • Quick Start Guide
    • Step 1: Create Your First Pipeline
    • Step 2: Start a Pipeline Execution
    • Step 3: Create Address Lineage
    • Step 4: Set Up Monitoring
    • Step 5: Configure Anomaly Detection
    • Step 6: Web Pages
    • Next Steps
  • Configuration
    • Environment Variables
      • Development Environment
      • Production Environment
    • Database Configuration
      • Connection Pool Default Settings
    • Redis Configuration
      • Connection Default Settings
      • Celery Default Configuration
    • Monitoring Configuration
      • Logfire Integration
      • Slack Notifications
    • Feature Flags
      • Auto-Create Anomaly Detection Rules
      • Profiling

API Reference

  • API Endpoints
    • Pipeline Management
      • Create or Get Pipeline
      • List Pipelines
      • Get Pipeline by ID
      • Update Pipeline
    • Pipeline Execution
      • Start Pipeline Execution
      • End Pipeline Execution
      • Get Pipeline Execution
    • Pipeline Types
      • Create or Get Pipeline Type
      • List Pipeline Types
      • Get Pipeline Type by ID
    • Address Management
      • Create or Get Address
      • List Addresses
      • Get Address by ID
      • Update Address
    • Address Types
      • Create or Get Address Type
      • List Address Types
      • Get Address Type by ID
    • Address Lineage
      • Create Address Lineage
    • Anomaly Detection
      • Create or Get Anomaly Detection Rule
      • List Anomaly Detection Rules
      • Get Anomaly Detection Rule by ID
      • Update Anomaly Detection Rule
      • Unflag Anomalies
    • Monitoring & Health
      • Check Timeliness
      • Check Freshness
      • Log Cleanup
      • Celery Queue Monitoring
  • Pydantic Data Models
    • Pipeline Models
      • PipelinePostInput
      • PipelinePostOutput
      • PipelinePatchInput
    • Pipeline Type Models
      • PipelineTypePostInput
      • PipelineTypePostOutput
      • PipelineTypePatchInput
    • Pipeline Execution Models
      • PipelineExecutionStartInput
      • PipelineExecutionStartOutput
      • PipelineExecutionEndInput
    • Address Models
      • AddressPostInput
      • AddressPostOutput
      • AddressPatchInput
    • Address Type Models
      • AddressTypePostInput
      • AddressTypePostOutput
      • AddressTypePatchInput
    • Address Lineage Models
      • AddressLineagePostInput
      • AddressLineagePostOutput
      • AddressLineageGetOutput
      • AddressLineageClosureGetOutput
    • Anomaly Detection Models
      • AnomalyDetectionRulePostInput
      • AnomalyDetectionRulePostOutput
      • AnomalyDetectionRulePatchInput
      • UnflagAnomalyInput
    • Monitoring Models
      • FreshnessPostOutput
      • TimelinessPostInput
      • TimelinessPostOutput
    • Log Cleanup Models
      • LogCleanupPostInput
      • LogCleanupPostOutput
    • Enums
      • AnomalyMetricFieldEnum
      • DatePartEnum
    • ValidatorModel

User Interface

  • Web Pages
    • Diagnostics Web Page
    • Lineage Graph Web Page
    • Reporting Dashboard Web Page
    • Interactive API Documentation

User Guides

  • Address Lineage
    • Understanding Address Lineage
    • Creating Lineage Relationships
    • Querying Lineage
    • Closure Table Pattern
    • Pipeline Integration
    • Managing Load Lineage Flag
    • Naming Conventions
    • Address Naming Convention
    • Address Type Organization
  • Anomaly Detection
    • How It Works
      • Statistical Analysis
      • Supported Metrics
    • Setting Up Anomaly Detection
      • Creating Detection Rules
      • Rule Configuration
      • Multiple Rules
    • Automatic Execution
      • Triggered Execution
      • No Manual Triggering Required
    • Anomaly Results
      • Understanding Results
      • Result Fields
    • Alert Notifications
      • Slack Alerts
      • Alert Configuration
    • Managing Anomalies
      • Viewing Anomalies
      • Updating Rules
      • Unflagging Anomalies
      • Adjusting Thresholds
      • Understanding the Data
      • Tuning Process
    • Best Practices
      • Rule Configuration
      • Threshold Selection
      • Monitoring Strategy
    • Common Scenarios
      • Data Volume Anomalies
      • Performance Anomalies
      • Throughput Anomalies
      • DML Operation Anomalies
    • Advanced Configuration
      • Auto-Creation Rules
  • Custom Database Querying
    • Index Utilization Patterns
      • Date-Based Queries
      • Time-Based Queries
    • Common Query Patterns
      • Pipeline Execution Analysis
      • Anomaly Detection Analysis
      • Lineage Analysis
      • Hierarchical Execution Analysis
    • Best Practices
      • Performance Considerations
      • Query Optimization
      • Safety Guidelines
    • Common Use Cases
      • Reporting and Analytics
      • Daily Pipeline Report Materialized View
      • Operational Monitoring
      • Data Quality Analysis
    • Advanced Patterns
      • Comparative Analysis
  • ETL Script Example
    • Overview
    • ETL Example Script
    • Key Features Demonstrated
  • Log Cleanup & Maintenance
    • How It Works
    • Configuration
    • API Usage
    • Cleanup Process
    • Scheduled Cleanup
  • Monitoring & Health Checks
    • Freshness Monitoring
      • Purpose
      • Configuration
      • Supported Time Units
      • Running Freshness Checks
    • Timeliness Monitoring
      • Purpose
      • Configuration
      • Running Timeliness Checks
    • Celery Queue Monitoring
      • Alert Thresholds
    • Scheduled Monitoring Tasks
    • System Health Monitoring
      • Diagnostics Dashboard
      • Pipeline Reporting Dashboard
    • Alerting Configuration
      • Slack Integration
      • Alert Types
    • Monitoring Strategy
      • Automated Monitoring
    • Load Testing
      • Trigger Load Tests
      • Load Test Scenarios
      • Performance Targets
  • Pipeline Management
    • Creating Pipelines
      • Basic Pipeline Creation
      • Pipeline Configuration
    • Managing Active Status
    • Pipeline Execution
      • Starting and Ending Executions
      • Execution Patterns
    • Pipeline Updates
      • Common Update Scenarios
    • Nested Pipeline Executions
      • Querying Nested Executions
    • Pipeline Organization
      • Pipeline Type Organization
      • Pipeline Naming Convention
  • Recommended Implementation - SDK
    • Installation
    • Key Processes
    • Define Pipeline Configuration
    • Initialize Watcher Client
    • Sync Pipeline Configuration
    • Track Pipeline Execution
    • Complete Example
    • Benefits of Using the SDK
  • Watermark Management
    • Understanding Watermarks
    • Watermark Patterns
    • Watermark Increment Logic
    • How to Use Watermarks
    • Practical Example

Reference

  • Architecture & Design
    • Design Philosophy
      • Configuration as Code
      • Efficiency & Performance
      • Scalability
      • Reliability
      • Observability
    • High-Level Architecture
    • FastAPI Framework
    • PostgreSQL Database
    • Celery Background Processing
    • Redis Message Broker
    • Docker Containerization
    • Logfire Integration
    • Performance Design Goals
  • Celery Tasks
    • Task Types
      • Regular Tasks
      • detect_anomalies_task
      • freshness_check_task
      • timeliness_check_task
      • address_lineage_closure_rebuild_task
      • pipeline_execution_closure_maintain_task
      • Scheduled Tasks
      • scheduled_freshness_check
      • scheduled_timeliness_check
      • scheduled_celery_queue_health_check
    • Task Configuration
      • Queue Management
      • Rate Limiting
      • Celery Beat Configuration
      • Retry Policies
      • Error Handling
    • Task Monitoring
      • Task Status Tracking
      • Progress Updates
      • Error Details
  • Database Schema
    • Core Tables
      • Pipeline
      • Pipeline Type
      • Pipeline Execution
      • Pipeline Execution Closure
      • Address
      • Address Type
      • Address Lineage
      • Address Lineage Closure
    • Monitoring Tables
      • Timeliness Pipeline Execution Log
      • Freshness Pipeline Log
    • Anomaly Detection
      • Anomaly Detection Rule
      • Anomaly Detection Result
    • Data Relationships
      • Hierarchical Relationships
      • Many-to-Many Relationships
  • Development - Kubernetes
    • Prerequisites
    • Installation Steps
      • Install kubectl
      • Install Helm
      • Verify Installation
    • Deploy to Kubernetes
      • Quick Start with Make Commands (Recommended)
      • Access Your Application
      • Stop the Development Environment
    • Manual Development Setup - Alternative
      • Start Dependencies
        • Build Docker Image
        • Deploy with Helm
        • Access the Application
    • Cleanup
      • Remove the Deployment
Watcher
  • Recommended Implementation - SDK
  • View page source

Recommended Implementation - SDK

This guide covers the recommended implementation using the etl-watcher-sdk Python SDK to integrate your ETL code with Watcher.

Note

The etl-watcher-sdk simplifies the integration process by providing a clean Python API. See the docs here: https://etl-watcher-sdk.readthedocs.io/en/latest/

Installation

Install the SDK using your preferred package manager:

uv add etl-watcher-sdk
pip install etl-watcher-sdk
poetry add etl-watcher-sdk

Key Processes

  1. Define Pipeline Configuration

  2. Initialize Watcher Client

  3. Sync Pipeline Configuration

  4. Track Pipeline Execution

Define Pipeline Configuration

Store your pipeline configuration in source control using the SDK’s data models:

import pendulum
from watcher import Address, AddressLineage, Pipeline, PipelineConfig

POLYGON_OPEN_CLOSE_PIPELINE_CONFIG = PipelineConfig(
    pipeline=Pipeline(
        name="polygon_open_close",
        pipeline_type_name="extraction",
        timeliness_number=20,
        timeliness_datepart="minute",
        freshness_number=1,
        freshness_datepart="day",
        pipeline_metadata={
            "description": "Daily stock price extraction from Polygon API",
            "owner": "data-team",
        },
    ),
    address_lineage=AddressLineage(
        source_addresses=[
            Address(
                name="https://api.polygon.io/v1/open-close/",
                address_type_name="polygon",
                address_type_group_name="api",
            )
        ],
        target_addresses=[
            Address(
                name="prod.polygon.open_close",
                address_type_name="postgres",
                address_type_group_name="database",
            )
        ],
    ),
    default_watermark=pendulum.date(2025, 10, 1).to_date_string(),
    next_watermark=pendulum.now().date().to_date_string(),
)

Initialize Watcher Client

Create a Watcher client instance:

from watcher import Watcher

# Initialize the Watcher client
watcher = Watcher("http://localhost:8000")

Sync Pipeline Configuration

Use the SDK to sync your pipeline configuration and address lineage with Watcher:

# Sync pipeline configuration with Watcher
synced_config = watcher.sync_pipeline_config(POLYGON_OPEN_CLOSE_PIPELINE_CONFIG)

Note

The synchronization also handles watermark management!

Track Pipeline Execution

Use the SDK’s decorator to track your ETL pipeline execution:

from watcher import ETLResult, WatcherContext

@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active,
    watermark=synced_config.watermark,
    next_watermark=synced_config.next_watermark,
)
def extract_data(watcher_context: WatcherContext, tickers: list[str]):
    # Access watermark and next_watermark from the context
    watermark = pendulum.parse(watcher_context.watermark).date()
    next_watermark = pendulum.parse(watcher_context.next_watermark).date()

    # Your ETL logic here
    total_rows = 0
    for ticker in tickers:
        # Process each ticker...
        total_rows += process_ticker(ticker, watermark, next_watermark)

    # Return ETL results
    return ETLResult(
        completed_successfully=True,
        total_rows=total_rows,
    )

The decorator automatically handles:

  • Starting and ending pipeline execution

  • Updating the pipeline execution with the ETLResult

  • Active/Inactive pipeline management

  • All HTTP communication with the Watcher API

And provides access to the watcher_context, allowing for:

  • Access to the watermark and next_watermark

  • Access to the pipeline execution context (for hiearchical execution tracking)

Complete Example

Here’s a complete example putting it all together:

import time
import httpx
import pendulum
from watcher import ETLResult, Watcher, WatcherContext
from pipeline import POLYGON_OPEN_CLOSE_PIPELINE_CONFIG

# Initialize Watcher client
watcher = Watcher("http://localhost:8000")

# Sync pipeline configuration
synced_config = watcher.sync_pipeline_config(POLYGON_OPEN_CLOSE_PIPELINE_CONFIG)

# Define your ETL function with tracking
@watcher.track_pipeline_execution(
    pipeline_id=synced_config.pipeline.id,
    active=synced_config.pipeline.active,
    watermark=synced_config.watermark,
    next_watermark=synced_config.next_watermark,
)
def extract_data(watcher_context: WatcherContext, tickers: list[str]):
    # Access the watcher_context to get the watermark and next_watermark
    watermark = pendulum.parse(watcher_context.watermark).date()
    next_watermark = pendulum.parse(watcher_context.next_watermark).date()

    all_records = []
    total_rows = 0

    for ticker in tickers:
        date = watermark
        while date < next_watermark:
            response = httpx.get(
                f"https://api.polygon.io/v1/open-close/{ticker}/{date}",
                params={"apiKey": "YOUR_API_KEY"},
            )

            if response.status_code == 429:
                time.sleep(30)  # Rate limit handling
                continue

            record = response.json()
            if record["status"] == "OK":
                all_records.append(record)

            date = date.add(days=1)

        total_rows += len(all_records)

    # Make sure to return the ETLResult
    return ETLResult(
        completed_successfully=True,
        total_rows=total_rows,
    )

# Execute your ETL pipeline
tickers = ["AAPL", "META"]
extract_data(tickers=tickers)

Benefits of Using the SDK

  • Simplified Integration: Abstracts away complex HTTP requests and API interactions

  • Synchronization: Automatically syncs pipeline configuration and address lineage with Watcher

  • Watermark Management: Automatically manages watermarks for each pipeline execution

  • Execution Tracking: Automatically tracks pipeline execution and updates the pipeline execution with the ETLResult

  • Hierarchical Execution Tracking: Provides access to the watcher_context for hierarchical execution tracking

This implementation provides a clean, maintainable way to integrate your ETL processes with the Watcher framework.

Previous Next

© Copyright 2025, Cortland Goffena.

Built with Sphinx using a theme provided by Read the Docs.