Quick Start Guide

This guide will get you up and running with Watcher.

Step 1: Create Your First Pipeline

  1. Create a pipeline (automatically creates pipeline type if needed)

    import httpx
    
    response = httpx.post("http://localhost:8000/pipeline", json={
        "name": "my data pipeline",
        "pipeline_type_name": "extraction"
    })
    print(response.json())
    

Step 2: Start a Pipeline Execution

  1. Start an execution

    Note

    The start_date field is optional. If omitted, it defaults to the current time using pendulum.now().

    import httpx
    
    # With explicit start_date
    response = httpx.post("http://localhost:8000/start_pipeline_execution", json={
        "pipeline_id": 1,
        "start_date": "2024-01-01T10:00:00Z"
    })
    print(response.json())
    
    # Without start_date (defaults to current time)
    response = httpx.post("http://localhost:8000/start_pipeline_execution", json={
        "pipeline_id": 1
    })
    print(response.json())
    
  2. End the execution with metrics

    import httpx
    
    response = httpx.post("http://localhost:8000/end_pipeline_execution", json={
        "id": 1,
        "end_date": "2024-01-01T10:05:00Z",
        "completed_successfully": True,
        "total_rows": 1000,
        "inserts": 800,
        "updates": 200,
        "soft_deletes": 0
    })
    print(response.json())
    

Step 3: Create Address Lineage

  1. Create address lineage (automatically creates addresses and address types if needed)

    import httpx
    
    response = httpx.post("http://localhost:8000/address_lineage", json={
        "pipeline_id": 1,
        "source_addresses": [
            {
                "name": "source_db.source_schema.source_table",
                "address_type_name": "postgres",
                "address_type_group_name": "database"
            }
        ],
        "target_addresses": [
            {
                "name": "target_db.target_schema.target_table",
                "address_type_name": "postgres",
                "address_type_group_name": "database"
            }
        ]
    })
    print(response.json())
    

Step 4: Set Up Monitoring

  1. Run a freshness check

    import httpx
    
    response = httpx.post("http://localhost:8000/freshness")
    print(response.json())
    
  1. Run a timeliness check

    import httpx
    
    response = httpx.post("http://localhost:8000/timeliness", json={
        "lookback_minutes": 60
    })
    print(response.json())
    
  2. Run a Celery queue check

    import httpx
    
    response = httpx.post("http://localhost:8000/celery/monitor-queue")
    print(response.json())
    

Step 5: Configure Anomaly Detection

Note

Set WATCHER_AUTO_CREATE_ANOMALY_DETECTION_RULES=true to automatically create anomaly detection rules for new pipelines.

  1. Create an anomaly detection rule

    import httpx
    
    response = httpx.post("http://localhost:8000/anomaly_detection_rule", json={
        "pipeline_id": 1,
        "metric_field": "total_rows",
        "z_threshold": 3.0,
        "minimum_executions": 30,
        "lookback_days": 30
    })
    print(response.json())
    
  2. Anomaly detection runs automatically after each successful pipeline execution

Step 6: Web Pages

  1. Check system health

    Visit: http://localhost:8000/diagnostics

  2. View API documentation

    Visit: http://localhost:8000/scalar

  3. View reporting dashboard

    Visit: http://localhost:8000/reporting

Next Steps

  • Set up scheduled monitoring: Configure cron jobs to ping the monitoring endpoints
  • Configure Slack alerts: Add your Slack webhook URL for notifications
  • Set up anomaly detection rules: Create rules for your specific metrics
  • Explore the web pages: Monitor system health & performance and access reporting dashboard
    • (see Web Pages - “Diagnostics Web Page” and “Reporting Dashboard Web Page” sections)