Pipeline Management
This guide covers how to effectively manage data pipelines in Watcher, including execution tracking, hierarchical workflows, and organizational best practices.
Creating Pipelines
Basic Pipeline Creation
Create a pipeline (pipeline_type is automatically created if it doesn’t exist):
import httpx
pipeline_data = {
"name": "daily sales pipeline",
"pipeline_type_name": "extraction",
"pipeline_metadata": {
"description": "Daily extraction of sales data",
"owner": "data-team",
"schedule": "0 2 * * *"
}
}
response = httpx.post(
"http://localhost:8000/pipeline",
json=pipeline_data
)
print(response.json())
curl -X POST "http://localhost:8000/pipeline" \
-H "Content-Type: application/json" \
-d '{
"name": "daily sales pipeline",
"pipeline_type_name": "extraction",
"pipeline_metadata": {
"description": "Daily extraction of sales data",
"owner": "data-team",
"schedule": "0 2 * * *"
}
}'
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type PipelineRequest struct {
Name string `json:"name"`
PipelineTypeName string `json:"pipeline_type_name"`
PipelineMetadata map[string]interface{} `json:"pipeline_metadata"`
}
func main() {
data := PipelineRequest{
Name: "daily sales pipeline",
PipelineTypeName: "extraction",
PipelineMetadata: map[string]interface{}{
"description": "Daily extraction of sales data",
"owner": "data-team",
"schedule": "0 2 * * *",
},
}
jsonData, _ := json.Marshal(data)
resp, _ := http.Post("http://localhost:8000/pipeline",
"application/json", bytes.NewBuffer(jsonData))
defer resp.Body.Close()
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
fmt.Println(result)
}
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.net.URI
import play.api.libs.json.Json
object PipelineExample {
def main(args: Array[String]): Unit = {
val client = HttpClient.newHttpClient()
val json = Json.obj(
"name" -> "daily sales pipeline",
"pipeline_type_name" -> "extraction",
"pipeline_metadata" -> Json.obj(
"description" -> "Daily extraction of sales data",
"owner" -> "data-team",
"schedule" -> "0 2 * * *"
)
).toString()
val request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/pipeline"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(json))
.build()
val response = client.send(request,
HttpResponse.BodyHandlers.ofString())
println(response.body())
}
}
Pipeline Configuration
Configure monitoring settings during pipeline creation:
{
"name": "Customer Data Pipeline",
"pipeline_type_name": "extraction",
"next_watermark": "2024-01-01T00:00:00Z",
"freshness_number": 24,
"freshness_datepart": "hour",
"timeliness_number": 2,
"timeliness_datepart": "hour",
}
Best Practices:
Store in Source Control: Keep pipeline definitions in the same repository as your pipeline logic
Use Variables: Reference pipeline names and other dynamic values from environment variables
Documentation: Document pipeline purposes and dependencies in your code comments
Framework Design:
The Watcher framework is designed to represent the configuration stored in source control. Any updates to your pipeline code will be automatically reflected in the Watcher framework through a hash-based change detection system. This ensures that the Watcher pipeline configuration stays synchronized with the configuration in your code.
Managing Active Status
The active flag allows you to control pipeline execution without deleting the pipeline configuration. This is useful for:
Temporary Disabling: Turn off pipelines during maintenance windows
Emergency Response: Quickly disable failing pipelines to prevent cascading issues
Default Behavior: - New pipelines are active: true by default - The flag is purely informational - your code can check this field to implement custom logic - Pipeline metadata and configuration are preserved regardless of active status - The active flag is always returned in the /pipeline API response
Managing Active Status:
The active flag can be updated via the PATCH endpoint:
import httpx
# Disable a pipeline
update_data = {
"id": 1,
"active": False
}
response = httpx.patch(
"http://localhost:8000/pipeline",
json=update_data
)
print(response.json())
# Disable pipeline
curl -X PATCH "http://localhost:8000/pipeline" \
-H "Content-Type: application/json" \
-d '{"id": 1, "active": false}'
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type PipelineUpdate struct {
ID int `json:"id"`
Active bool `json:"active"`
}
func main() {
data := PipelineUpdate{
ID: 1,
Active: true,
}
jsonData, _ := json.Marshal(data)
req, _ := http.NewRequest("PATCH", "http://localhost:8000/pipeline",
bytes.NewBuffer(jsonData))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, _ := client.Do(req)
defer resp.Body.Close()
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
fmt.Println(result)
}
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.net.URI
import play.api.libs.json.Json
object PipelineUpdateExample {
def main(args: Array[String]): Unit = {
val client = HttpClient.newHttpClient()
val json = Json.obj(
"id" -> 1,
"active" -> true
).toString()
val request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/pipeline"))
.header("Content-Type", "application/json")
.method("PATCH", HttpRequest.BodyPublishers.ofString(json))
.build()
val response = client.send(request,
HttpResponse.BodyHandlers.ofString())
println(response.body())
}
}
Practical Example:
Here’s a complete example showing how to create/get a pipeline and use the active flag:
import requests
def run_pipeline_if_active(pipeline_name: str, pipeline_type: str):
"""Create or get pipeline and run only if active."""
# Create or get pipeline
pipeline_data = {
"name": pipeline_name,
"pipeline_type_name": pipeline_type
}
response = requests.post(
"http://localhost:8000/pipeline",
json=pipeline_data
)
pipeline = response.json()
# Check active flag before proceeding
if not pipeline["active"]:
print(f"Pipeline '{pipeline_name}' is inactive, skipping execution")
return
print(f"Pipeline '{pipeline_name}' is active, proceeding with execution")
# Your pipeline logic here
# - Data extraction
# - Data transformation
# - Data loading
# - etc.
print("Pipeline execution completed successfully")
# Usage
run_pipeline_if_active("daily sales pipeline", "extraction")
Pipeline Execution
Starting and Ending Executions
import httpx
# Start execution
start_data = {
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z"
}
start_response = httpx.post(
"http://localhost:8000/start_pipeline_execution",
json=start_data
)
execution_id = start_response.json()["id"]
print(f"Execution started: {execution_id}")
# Your pipeline code executes here
# - Data extraction/transformation logic
# - Database operations
# - File processing
# - API calls
# - Any other business logic
# End execution
end_data = {
"id": execution_id,
"end_date": "2024-01-01T10:05:00Z",
"completed_successfully": True,
"total_rows": 10000,
"inserts": 8000,
"updates": 2000,
"soft_deletes": 0
}
end_response = httpx.post(
"http://localhost:8000/end_pipeline_execution",
json=end_data
)
print(end_response.json())
# Start execution
curl -X POST "http://localhost:8000/start_pipeline_execution" \
-H "Content-Type: application/json" \
-d '{
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z"
}'
# Your pipeline code executes here
# - Data extraction/transformation logic
# - Database operations
# - File processing
# - API calls
# - Any other business logic
# End execution
curl -X POST "http://localhost:8000/end_pipeline_execution" \
-H "Content-Type: application/json" \
-d '{
"id": 1,
"end_date": "2024-01-01T10:05:00Z",
"completed_successfully": true,
"total_rows": 10000,
"inserts": 8000,
"updates": 2000,
"soft_deletes": 0
}'
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type StartExecution struct {
PipelineID int `json:"pipeline_id"`
StartDate string `json:"start_date"`
}
type EndExecution struct {
ID int `json:"id"`
PipelineID int `json:"pipeline_id"`
EndDate string `json:"end_date"`
CompletedSuccessfully bool `json:"completed_successfully"`
TotalRows int `json:"total_rows"`
Inserts int `json:"inserts"`
Updates int `json:"updates"`
SoftDeletes int `json:"soft_deletes"`
}
func main() {
// Start execution
startData := StartExecution{
PipelineID: 1,
StartDate: "2024-01-01T10:00:00Z",
}
jsonData, _ := json.Marshal(startData)
resp, _ := http.Post("http://localhost:8000/start_pipeline_execution",
"application/json", bytes.NewBuffer(jsonData))
defer resp.Body.Close()
var startResult map[string]interface{}
json.NewDecoder(resp.Body).Decode(&startResult)
executionID := int(startResult["id"].(float64))
fmt.Printf("Execution started: %d\n", executionID)
// Your pipeline code executes here
// - Data extraction/transformation logic
// - Database operations
// - File processing
// - API calls
// - Any other business logic
// End execution
endData := EndExecution{
ID: executionID,
PipelineID: 1,
EndDate: "2024-01-01T10:05:00Z",
CompletedSuccessfully: true,
TotalRows: 10000,
Inserts: 8000,
Updates: 2000,
SoftDeletes: 0,
}
jsonData, _ = json.Marshal(endData)
resp, _ = http.Post("http://localhost:8000/end_pipeline_execution",
"application/json", bytes.NewBuffer(jsonData))
defer resp.Body.Close()
var endResult map[string]interface{}
json.NewDecoder(resp.Body).Decode(&endResult)
fmt.Println(endResult)
}
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.net.URI
import play.api.libs.json.Json
object PipelineExecutionExample {
def main(args: Array[String]): Unit = {
val client = HttpClient.newHttpClient()
// Start execution
val startJson = Json.obj(
"pipeline_id" -> 1,
"start_date" -> "2024-01-01T10:00:00Z"
).toString()
val startRequest = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/start_pipeline_execution"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(startJson))
.build()
val startResponse = client.send(startRequest,
HttpResponse.BodyHandlers.ofString())
val startResult = Json.parse(startResponse.body())
val executionId = (startResult \ "id").as[Int]
println(s"Execution started: $executionId")
// Your pipeline code executes here
// - Data extraction/transformation logic
// - Database operations
// - File processing
// - API calls
// - Any other business logic
// End execution
val endJson = Json.obj(
"id" -> executionId,
"end_date" -> "2024-01-01T10:05:00Z",
"completed_successfully" -> true,
"total_rows" -> 10000,
"inserts" -> 8000,
"updates" -> 2000,
"soft_deletes" -> 0
).toString()
val endRequest = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/end_pipeline_execution"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(endJson))
.build()
val endResponse = client.send(endRequest,
HttpResponse.BodyHandlers.ofString())
println(endResponse.body())
}
}
Execution Patterns
Full Load Pattern
{
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z"
}
Incremental Load Pattern
{
"pipeline_id": 1,
"start_date": "2024-01-02T10:00:00Z",
"next_watermark": "2024-01-02T23:59:59Z"
}
Nested Execution Pattern
{
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z",
"parent_id": 5
}
Pipeline Updates
Common Update Scenarios
Change Monitoring Thresholds
{
"id": 1,
"freshness_number": 48,
"freshness_datepart": "hour",
"timeliness_number": 4,
"timeliness_datepart": "hour"
}
Mute Monitoring Checks
{
"id": 1,
"mute_freshness_check": true,
"mute_timeliness_check": true
}
Update Watermark
{
"id": 1,
"next_watermark": "2024-01-02T00:00:00Z"
}
Nested Pipeline Executions
Watcher supports hierarchical pipeline execution tracking through the parent_id field, enabling you to model complex workflows with sub-pipelines and dependencies.
Use Cases:
Main Pipeline: A main orchestration pipeline that coordinates multiple sub-pipelines
Sub-Pipeline Tracking: Individual components or steps within a larger workflow
Dependency Management: Track which sub-pipelines depend on others
Performance Analysis: Analyze execution times at both main and sub-pipeline levels
Error Isolation: Identify which specific sub-pipeline failed within a complex workflow
Example Workflow:
Main Pipeline: data_processing_main
├── Sub-Pipeline: extract_sales_data (parent_id: main_execution_id)
├── Sub-Pipeline: extract_marketing_data (parent_id: main_execution_id)
├── Sub-Pipeline: transform_combined_data (parent_id: main_execution_id)
└── Sub-Pipeline: load_to_warehouse (parent_id: main_execution_id)
API Usage:
import httpx
# Start main pipeline execution
# Note: start_date is optional; if omitted, defaults to current time
main_response = httpx.post(
"http://localhost:8000/start_pipeline_execution",
json={
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z"
}
)
main_execution_id = main_response.json()["id"]
# Start sub-pipeline with parent reference
sub_response = httpx.post(
"http://localhost:8000/start_pipeline_execution",
json={
"pipeline_id": 2,
"start_date": "2024-01-01T10:00:00Z",
"parent_id": main_execution_id
}
)
# Start main pipeline execution
curl -X POST "http://localhost:8000/start_pipeline_execution" \
-H "Content-Type: application/json" \
-d '{
"pipeline_id": 1,
"start_date": "2024-01-01T10:00:00Z"
}'
# Start sub-pipeline with parent reference
curl -X POST "http://localhost:8000/start_pipeline_execution" \
-H "Content-Type: application/json" \
-d '{
"pipeline_id": 2,
"start_date": "2024-01-01T10:00:00Z",
"parent_id": 123
}'
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type StartExecution struct {
PipelineID int `json:"pipeline_id"`
StartDate string `json:"start_date"`
ParentID *int `json:"parent_id,omitempty"`
}
func main() {
// Start main pipeline execution
mainData := StartExecution{
PipelineID: 1,
StartDate: "2024-01-01T10:00:00Z",
}
jsonData, _ := json.Marshal(mainData)
resp, _ := http.Post("http://localhost:8000/start_pipeline_execution",
"application/json", bytes.NewBuffer(jsonData))
defer resp.Body.Close()
var mainResult map[string]interface{}
json.NewDecoder(resp.Body).Decode(&mainResult)
mainExecutionID := int(mainResult["id"].(float64))
// Start sub-pipeline with parent reference
subData := StartExecution{
PipelineID: 2,
StartDate: "2024-01-01T10:00:00Z",
ParentID: &mainExecutionID,
}
jsonData, _ = json.Marshal(subData)
resp, _ = http.Post("http://localhost:8000/start_pipeline_execution",
"application/json", bytes.NewBuffer(jsonData))
defer resp.Body.Close()
var subResult map[string]interface{}
json.NewDecoder(resp.Body).Decode(&subResult)
fmt.Println("Sub-pipeline started:", subResult)
}
import java.net.http.{HttpClient, HttpRequest, HttpResponse}
import java.net.URI
import play.api.libs.json.Json
object NestedPipelineExample {
def main(args: Array[String]): Unit = {
val client = HttpClient.newHttpClient()
// Start main pipeline execution
val mainJson = Json.obj(
"pipeline_id" -> 1,
"start_date" -> "2024-01-01T10:00:00Z"
).toString()
val mainRequest = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/start_pipeline_execution"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(mainJson))
.build()
val mainResponse = client.send(mainRequest,
HttpResponse.BodyHandlers.ofString())
val mainResult = Json.parse(mainResponse.body())
val mainExecutionId = (mainResult \ "id").as[Int]
// Start sub-pipeline with parent reference
val subJson = Json.obj(
"pipeline_id" -> 2,
"start_date" -> "2024-01-01T10:00:00Z",
"parent_id" -> mainExecutionId
).toString()
val subRequest = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8000/start_pipeline_execution"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(subJson))
.build()
val subResponse = client.send(subRequest,
HttpResponse.BodyHandlers.ofString())
println("Sub-pipeline started:", subResponse.body())
}
}
Querying Nested Executions
The system automatically maintains a closure table (pipeline_execution_closure) that enables efficient querying of hierarchical relationships without recursive queries.
Closure Table Structure:
parent_execution_id: The ancestor execution ID
child_execution_id: The descendant execution ID
depth: The relationship depth (0 = self-reference, 1 = direct child, 2 = grandchild, etc.)
Example Queries:
-- Get all direct children of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
ON pe.id = pec.child_execution_id
WHERE pec.parent_execution_id = 123
AND pec.depth = 1;
-- Get all downstream dependencies of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
ON pe.id = pec.child_execution_id
WHERE pec.parent_execution_id = 123
AND pec.depth > 0;
-- Get all upstream dependencies of an execution
SELECT pe.*
FROM pipeline_execution pe
JOIN pipeline_execution_closure pec
ON pe.id = pec.parent_execution_id
WHERE pec.child_execution_id = 456
AND pec.depth > 0;
Benefits:
Hierarchical Monitoring: Track both overall workflow progress and individual component performance
Dependency Tracking: Understand which sub-pipelines are blocking others
Root Cause Analysis: Quickly identify which specific component caused a failure
Resource Optimization: Analyze which sub-pipelines consume the most time/resources
Audit Trail: Complete visibility into complex multi-step data processes
Pipeline Organization
Effective organization of your Watcher metadata is crucial for maintainability, monitoring, and team collaboration.
Best Practices:
Consistency: Use the same naming patterns across all teams and projects
Descriptiveness: Names should clearly indicate purpose and scope
Hierarchy: Use underscores to create logical hierarchies
Future-Proofing: Choose names that will remain relevant as systems evolve
Documentation: Document your naming conventions and share with all teams
Validation: Implement naming validation in your CI/CD pipeline or code reviews
Pipeline Type Organization
Organize pipeline types by data processing patterns or business domains or a combination of both:
Data Processing Pattern:
extraction - Data extraction pipelines
transformation - Data transformation and processing
loading - Data loading and materialization
audit - Data quality and validation
monitoring - System monitoring and health checks
Business Domain:
sales
marketing
finance
Combination:
sales_extraction
marketing_audit
finance_monitoring
Pipeline Naming Convention
Use a clear naming structure that matches back to the pipeline code (e.g., DAG name, job name, or workflow identifier).
Best Practices:
Match your DAG/job/workflow names exactly
Use consistent abbreviations across your organization
Keep names descriptive but concise
Use underscores for separation, avoid special characters