Pipeline Management

This guide covers how to effectively manage data pipelines in Watcher, including execution tracking, hierarchical workflows, and organizational best practices.

Creating Pipelines

Basic Pipeline Creation

Create a pipeline (pipeline_type is automatically created if it doesn’t exist):

import httpx

pipeline_data = {
    "name": "daily sales pipeline",
    "pipeline_type_name": "extraction",
    "pipeline_metadata": {
        "description": "Daily extraction of sales data",
        "owner": "data-team",
        "schedule": "0 2 * * *"
    }
}

response = httpx.post(
    "http://localhost:8000/pipeline",
    json=pipeline_data
)
print(response.json())

Pipeline Configuration

Configure monitoring settings during pipeline creation:

{
  "name": "Customer Data Pipeline",
  "pipeline_type_name": "extraction",
  "next_watermark": "2024-01-01T00:00:00Z",
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
}

Best Practices:

  • Store in Source Control: Keep pipeline definitions in the same repository as your pipeline logic

  • Use Variables: Reference pipeline names and other dynamic values from environment variables

  • Documentation: Document pipeline purposes and dependencies in your code comments

Framework Design:

The Watcher framework is designed to represent the configuration stored in source control. Any updates to your pipeline code will be automatically reflected in the Watcher framework through a hash-based change detection system. This ensures that the Watcher pipeline configuration stays synchronized with the configuration in your code.

Managing Active Status

The active flag allows you to control pipeline execution without deleting the pipeline configuration. This is useful for:

  • Temporary Disabling: Turn off pipelines during maintenance windows

  • Emergency Response: Quickly disable failing pipelines to prevent cascading issues

Default Behavior: - New pipelines are active: true by default - The flag is purely informational - your code can check this field to implement custom logic - Pipeline metadata and configuration are preserved regardless of active status - The active flag is always returned in the /pipeline API response

Managing Active Status:

The active flag can be updated via the PATCH endpoint:

import httpx

# Disable a pipeline
update_data = {
    "id": 1,
    "active": False
}

response = httpx.patch(
    "http://localhost:8000/pipeline",
    json=update_data
)
print(response.json())

Practical Example:

Here’s a complete example showing how to create/get a pipeline and use the active flag:

import requests

def run_pipeline_if_active(pipeline_name: str, pipeline_type: str):
    """Create or get pipeline and run only if active."""

    # Create or get pipeline
    pipeline_data = {
        "name": pipeline_name,
        "pipeline_type_name": pipeline_type
    }

    response = requests.post(
        "http://localhost:8000/pipeline",
        json=pipeline_data
    )
    pipeline = response.json()

    # Check active flag before proceeding
    if not pipeline["active"]:
        print(f"Pipeline '{pipeline_name}' is inactive, skipping execution")
        return

    print(f"Pipeline '{pipeline_name}' is active, proceeding with execution")

    # Your pipeline logic here
    # - Data extraction
    # - Data transformation
    # - Data loading
    # - etc.

    print("Pipeline execution completed successfully")

# Usage
run_pipeline_if_active("daily sales pipeline", "extraction")

Pipeline Execution

Starting and Ending Executions

import httpx

# Start execution
start_data = {
    "pipeline_id": 1,
    "start_date": "2024-01-01T10:00:00Z"
}

start_response = httpx.post(
    "http://localhost:8000/start_pipeline_execution",
    json=start_data
)
execution_id = start_response.json()["id"]
print(f"Execution started: {execution_id}")

# Your pipeline code executes here
# - Data extraction/transformation logic
# - Database operations
# - File processing
# - API calls
# - Any other business logic

# End execution
end_data = {
    "id": execution_id,
    "end_date": "2024-01-01T10:05:00Z",
    "completed_successfully": True,
    "total_rows": 10000,
    "inserts": 8000,
    "updates": 2000,
    "soft_deletes": 0
}

end_response = httpx.post(
    "http://localhost:8000/end_pipeline_execution",
    json=end_data
)
print(end_response.json())

Execution Patterns

Full Load Pattern

{
  "pipeline_id": 1,
  "start_date": "2024-01-01T10:00:00Z"
}

Incremental Load Pattern

{
  "pipeline_id": 1,
  "start_date": "2024-01-02T10:00:00Z",
  "next_watermark": "2024-01-02T23:59:59Z"
}

Nested Execution Pattern

{
  "pipeline_id": 1,
  "start_date": "2024-01-01T10:00:00Z",
  "parent_id": 5
}

Pipeline Updates

Common Update Scenarios

Change Monitoring Thresholds

{
  "id": 1,
  "freshness_number": 48,
  "freshness_datepart": "hour",
  "timeliness_number": 4,
  "timeliness_datepart": "hour"
}

Mute Monitoring Checks

{
  "id": 1,
  "mute_freshness_check": true,
  "mute_timeliness_check": true
}

Update Watermark

{
  "id": 1,
  "next_watermark": "2024-01-02T00:00:00Z"
}

Nested Pipeline Executions

Watcher supports hierarchical pipeline execution tracking through the parent_id field, enabling you to model complex workflows with sub-pipelines and dependencies.

Use Cases:

  • Main Pipeline: A main orchestration pipeline that coordinates multiple sub-pipelines

  • Sub-Pipeline Tracking: Individual components or steps within a larger workflow

  • Dependency Management: Track which sub-pipelines depend on others

  • Performance Analysis: Analyze execution times at both main and sub-pipeline levels

  • Error Isolation: Identify which specific sub-pipeline failed within a complex workflow

Example Workflow:

Main Pipeline: data_processing_main
├── Sub-Pipeline: extract_sales_data (parent_id: main_execution_id)
├── Sub-Pipeline: extract_marketing_data (parent_id: main_execution_id)
├── Sub-Pipeline: transform_combined_data (parent_id: main_execution_id)
└── Sub-Pipeline: load_to_warehouse (parent_id: main_execution_id)

API Usage:

import httpx

# Start main pipeline execution
# Note: start_date is optional; if omitted, defaults to current time
main_response = httpx.post(
    "http://localhost:8000/start_pipeline_execution",
    json={
        "pipeline_id": 1,
        "start_date": "2024-01-01T10:00:00Z"
    }
)
main_execution_id = main_response.json()["id"]

# Start sub-pipeline with parent reference
sub_response = httpx.post(
    "http://localhost:8000/start_pipeline_execution",
    json={
        "pipeline_id": 2,
        "start_date": "2024-01-01T10:00:00Z",
        "parent_id": main_execution_id
    }
)

Querying Nested Executions

The system automatically maintains a closure table (pipeline_execution_closure) that enables efficient querying of hierarchical relationships without recursive queries.

Closure Table Structure:

  • parent_execution_id: The ancestor execution ID

  • child_execution_id: The descendant execution ID

  • depth: The relationship depth (0 = self-reference, 1 = direct child, 2 = grandchild, etc.)

Example Queries:

-- Get all direct children of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
    ON pe.id = pec.child_execution_id
WHERE pec.parent_execution_id = 123
    AND pec.depth = 1;

-- Get all downstream dependencies of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
    ON pe.id = pec.child_execution_id
WHERE pec.parent_execution_id = 123
    AND pec.depth > 0;

-- Get all upstream dependencies of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
    ON pe.id = pec.parent_execution_id
WHERE pec.child_execution_id = 456
    AND pec.depth > 0;

Benefits:

  • Hierarchical Monitoring: Track both overall workflow progress and individual component performance

  • Dependency Tracking: Understand which sub-pipelines are blocking others

  • Root Cause Analysis: Quickly identify which specific component caused a failure

  • Resource Optimization: Analyze which sub-pipelines consume the most time/resources

  • Audit Trail: Complete visibility into complex multi-step data processes

Pipeline Organization

Effective organization of your Watcher metadata is crucial for maintainability, monitoring, and team collaboration.

Best Practices:

  1. Consistency: Use the same naming patterns across all teams and projects

  2. Descriptiveness: Names should clearly indicate purpose and scope

  3. Hierarchy: Use underscores to create logical hierarchies

  4. Future-Proofing: Choose names that will remain relevant as systems evolve

  5. Documentation: Document your naming conventions and share with all teams

  6. Validation: Implement naming validation in your CI/CD pipeline or code reviews

Pipeline Type Organization

Organize pipeline types by data processing patterns or business domains or a combination of both:

Data Processing Pattern:

  • extraction - Data extraction pipelines

  • transformation - Data transformation and processing

  • loading - Data loading and materialization

  • audit - Data quality and validation

  • monitoring - System monitoring and health checks

Business Domain:

  • sales

  • marketing

  • finance

Combination:

  • sales_extraction

  • marketing_audit

  • finance_monitoring

Pipeline Naming Convention

Use a clear naming structure that matches back to the pipeline code (e.g., DAG name, job name, or workflow identifier).

Best Practices:

  • Match your DAG/job/workflow names exactly

  • Use consistent abbreviations across your organization

  • Keep names descriptive but concise

  • Use underscores for separation, avoid special characters