API Endpoints
This section documents all available API endpoints in Watcher.
Pipeline Management
Create or Get Pipeline
- POST /pipeline
Create a new pipeline or get existing one (upsert behavior). Automatically creates pipeline type if it doesn’t exist.
Request Body:
{ "name": "my data pipeline", "pipeline_type_name": "extraction", "next_watermark": "2024-01-02T00:00:00Z", "pipeline_metadata": { "description": "Daily data extraction pipeline" }, "freshness_number": 24, "freshness_datepart": "hour", "timeliness_number": 2, "timeliness_datepart": "hour" }
Response:
{ "id": 1, "active": true, "load_lineage": true, "watermark": "2024-01-01T00:00:00Z" }
Request Body Fields:
name(string): Pipeline name (1-150 characters, required)pipeline_type_name(string): Pipeline type name (1-150 characters, required)next_watermark(string|int|datetime|date): Next watermark value (optional)pipeline_metadata(object): Additional pipeline metadata (optional)freshness_number(int): Freshness check interval number (optional, >0)freshness_datepart(string): Freshness check date part (optional, hour, day, week, month, year)timeliness_number(int): Timeliness check interval number (optional, >0)timeliness_datepart(string): Timeliness check date part (optional, hour, day, week, month, year)
Response Fields:
id(int): Pipeline IDactive(bool): Whether pipeline is activeload_lineage(bool): Whether to load lineagewatermark(string|int|datetime|date): Current watermark value (returned when next_watermark is provided)
Status Codes:
201Created - New pipeline was created200OK - Existing pipeline was found500Internal Server Error - Unique constraint violation
List Pipelines
- GET /pipeline
Get all pipelines.
Response:
[ { "id": 1, "name": "my data pipeline", "pipeline_type_id": 1, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-01T23:59:59Z", "pipeline_metadata": { "environment": "production" }, "last_target_insert": "2024-01-01T10:00:00Z", "last_target_update": "2024-01-01T10:00:00Z", "last_target_soft_delete": null, "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "load_lineage": true, "active": true, "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T12:00:00Z" }, { "id": 2, "name": "another pipeline", "pipeline_type_id": 2, "watermark": null, "next_watermark": null, "pipeline_metadata": null, "last_target_insert": null, "last_target_update": null, "last_target_soft_delete": null, "freshness_number": 48, "freshness_datepart": "hour", "mute_freshness_check": true, "timeliness_number": 4, "timeliness_datepart": "hour", "mute_timeliness_check": false, "load_lineage": false, "active": true, "created_at": "2024-01-01T00:00:00Z", "updated_at": null } ]
Response Fields:
id(int): Pipeline IDname(string): Pipeline name (1-150 characters)pipeline_type_id(int): Pipeline type IDwatermark(string): Watermark value (max 50 characters, nullable)next_watermark(string): Next watermark value (max 50 characters, nullable)pipeline_metadata(object): Pipeline metadata (JSONB, nullable)last_target_insert(string): Last target insert timestamp (ISO 8601, nullable)last_target_update(string): Last target update timestamp (ISO 8601, nullable)last_target_soft_delete(string): Last target soft delete timestamp (ISO 8601, nullable)freshness_number(int): Freshness check interval number (nullable)freshness_datepart(string): Freshness check date part (hour, day, week, month, year, nullable)mute_freshness_check(bool): Whether freshness check is mutedtimeliness_number(int): Timeliness check interval number (nullable)timeliness_datepart(string): Timeliness check date part (hour, day, week, month, year, nullable)mute_timeliness_check(bool): Whether timeliness check is mutedload_lineage(bool): Whether to load lineageactive(bool): Whether pipeline is activecreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Pipelines retrieved successfully
Get Pipeline by ID
- GET /pipeline/{pipeline_id}
Get a specific pipeline by ID.
Parameters: -
pipeline_id(int): Pipeline IDResponse:
{ "id": 1, "name": "my data pipeline", "pipeline_type_id": 1, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-01T23:59:59Z", "pipeline_metadata": { "environment": "production" }, "last_target_insert": "2024-01-01T10:00:00Z", "last_target_update": "2024-01-01T10:00:00Z", "last_target_soft_delete": null, "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "load_lineage": true, "active": true, "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T12:00:00Z" }
Response Fields:
id(int): Pipeline IDname(string): Pipeline name (1-150 characters)pipeline_type_id(int): Pipeline type IDwatermark(string): Watermark value (max 50 characters)next_watermark(string): Next watermark value (max 50 characters)pipeline_metadata(object): Pipeline metadata (JSONB)last_target_insert(string): Last target insert timestamp (ISO 8601, nullable)last_target_update(string): Last target update timestamp (ISO 8601, nullable)last_target_soft_delete(string): Last target soft delete timestamp (ISO 8601, nullable)freshness_number(int): Freshness check interval numberfreshness_datepart(string): Freshness check date part (hour, day, week, month, year)mute_freshness_check(bool): Whether freshness check is mutedtimeliness_number(int): Timeliness check interval numbertimeliness_datepart(string): Timeliness check date part (hour, day, week, month, year)mute_timeliness_check(bool): Whether timeliness check is mutedload_lineage(bool): Whether to load lineageactive(bool): Whether pipeline is activecreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Pipeline found404Not Found - Pipeline not found
Update Pipeline
- PATCH /pipeline
Update pipeline configuration.
Request Body:
{ "id": 1, "name": "updated pipeline name", "pipeline_type_id": 2, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-02T00:00:00Z", "pipeline_metadata": { "environment": "production" }, "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "load_lineage": true }
Response:
{ "id": 1, "name": "updated pipeline name", "pipeline_type_id": 2, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-02T00:00:00Z", "pipeline_metadata": { "environment": "production" }, "last_target_insert": "2024-01-01T10:00:00Z", "last_target_update": "2024-01-01T10:00:00Z", "last_target_soft_delete": null, "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "load_lineage": true, "active": true, "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T12:00:00Z" }
Request Body Fields:
id(int): Pipeline ID (required)name(string): Pipeline name (1-150 characters, optional)pipeline_type_id(int): Pipeline type ID (optional)watermark(string|int|datetime|date): Watermark value (optional)next_watermark(string|int|datetime|date): Next watermark value (optional)pipeline_metadata(object): Additional pipeline metadata (optional)freshness_number(int): Freshness check interval number (optional, >0)freshness_datepart(string): Freshness check date part (optional, hour, day, week, month, year)mute_freshness_check(bool): Whether freshness check is muted (optional)timeliness_number(int): Timeliness check interval number (optional, >0)timeliness_datepart(string): Timeliness check date part (optional, hour, day, week, month, year)mute_timeliness_check(bool): Whether timeliness check is muted (optional)load_lineage(bool): Whether to load lineage (optional)
Response Fields:
id(int): Pipeline IDname(string): Pipeline namepipeline_type_id(int): Pipeline type IDwatermark(string): Watermark valuenext_watermark(string): Next watermark valuepipeline_metadata(object): Pipeline metadatalast_target_insert(string): Last target insert timestamp (ISO 8601, nullable)last_target_update(string): Last target update timestamp (ISO 8601, nullable)last_target_soft_delete(string): Last target soft delete timestamp (ISO 8601, nullable)freshness_number(int): Freshness check interval numberfreshness_datepart(string): Freshness check date partmute_freshness_check(bool): Whether freshness check is mutedtimeliness_number(int): Timeliness check interval numbertimeliness_datepart(string): Timeliness check date partmute_timeliness_check(bool): Whether timeliness check is mutedload_lineage(bool): Whether to load lineageactive(bool): Whether pipeline is activecreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601)
Status Codes:
200OK - Pipeline updated successfully404Not Found - Pipeline not found
Pipeline Execution
Start Pipeline Execution
- POST /start_pipeline_execution
Start a new pipeline execution. Automatically calculates hour_recorded and date_recorded. If
start_dateis not provided, it defaults to the current time usingpendulum.now().Request Body:
{ "pipeline_id": 1, "start_date": "2024-01-01T10:00:00Z", "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-01T23:59:59Z", "parent_id": null }
Minimal Request Body (
start_dateomitted, defaults to current time):{ "pipeline_id": 1 }
Response:
{ "id": 1 }
Request Body Fields:
pipeline_id(int): Pipeline ID (required)start_date(string): Start timestamp (ISO 8601, optional). If not provided, defaults to the current time usingpendulum.now().watermark(string|int|datetime|date): Watermark value (optional)next_watermark(string|int|datetime|date): Next watermark value (optional)parent_id(int): Parent execution ID for hierarchical executions (optional)
Response Fields:
id(int): Pipeline execution ID
Status Codes:
201Created - Pipeline execution started successfully
End Pipeline Execution
- POST /end_pipeline_execution
End a pipeline execution with metrics. Automatically calculates duration and throughput.
Request Body:
{ "id": 1, "end_date": "2024-01-01T10:05:00Z", "completed_successfully": true, "total_rows": 1000, "inserts": 800, "updates": 200, "soft_deletes": 0, "execution_metadata": { "partition": "2025-01-05" } }
Response: HTTP 204 No Content
Request Body Fields:
id(int): Pipeline execution ID (required)end_date(string): End timestamp (ISO 8601, required)completed_successfully(bool): Whether execution completed successfully (optional)total_rows(int): Total rows processed (optional, ≥0)inserts(int): Number of inserts (optional, ≥0)updates(int): Number of updates (optional, ≥0)soft_deletes(int): Number of soft deletes (optional, ≥0)execution_metadata(object): Additional execution metadata (optional)
Status Codes:
204No Content - Pipeline execution ended successfully400Bad Request - end_date must be greater than start_date404Not Found - Pipeline execution not found500Internal Server Error - Database integrity error
Get Pipeline Execution
- GET /pipeline_execution/{pipeline_execution_id}
Get a specific pipeline execution with hierarchical child executions using the closure table.
Parameters: -
pipeline_execution_id(int): Pipeline execution IDResponse:
{ "id": 1, "parent_id": null, "pipeline_id": 1, "start_date": "2024-01-01T10:00:00Z", "end_date": "2024-01-01T10:05:00Z", "duration_seconds": 300, "completed_successfully": true, "inserts": 800, "updates": 200, "soft_deletes": 0, "total_rows": 1000, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-01T23:59:59Z", "execution_metadata": { "partition": "2025-01-05" }, "anomaly_flags": { "total_rows": true, "duration_seconds": false }, "throughput": 3.33, "child_executions": [ { "id": 2, "parent_id": 1, "pipeline_id": 1, "start_date": "2024-01-01T10:01:00Z", "end_date": "2024-01-01T10:03:00Z", "duration_seconds": 120, "completed_successfully": true, "inserts": 400, "updates": 100, "soft_deletes": 0, "total_rows": 500, "watermark": "2024-01-01T00:00:00Z", "next_watermark": "2024-01-01T23:59:59Z", "execution_metadata": null, "anomaly_flags": null, "throughput": 4.17, "child_executions": [] } ] }
Response Fields:
id(int): Pipeline execution IDparent_id(int): Parent execution ID (nullable)pipeline_id(int): Pipeline IDstart_date(string): Start timestamp (ISO 8601)end_date(string): End timestamp (ISO 8601, nullable)duration_seconds(int): Execution duration in seconds (nullable)completed_successfully(bool): Whether execution completed successfully (nullable)inserts(int): Number of inserts (nullable)updates(int): Number of updates (nullable)soft_deletes(int): Number of soft deletes (nullable)total_rows(int): Total rows processed (nullable)watermark(string): Watermark value (nullable)next_watermark(string): Next watermark value (nullable)execution_metadata(object): Additional execution metadata (nullable)anomaly_flags(object): Anomaly detection flags (nullable)throughput(float): Rows per second throughput (nullable)child_executions(array): Array of child execution objects (nullable)
Status Codes:
200OK - Pipeline execution found404Not Found - Pipeline execution not found
Pipeline Types
Create or Get Pipeline Type
- POST /pipeline_type
Create a new pipeline type or get existing one (upsert behavior).
Request Body:
{ "name": "extraction", "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false }
Response:
{ "id": 1 }
Request Body Fields:
name(string): Pipeline type name (1-150 characters, required)freshness_number(int): Freshness check interval number (optional, >0)freshness_datepart(string): Freshness check date part (optional, hour, day, week, month, year)mute_freshness_check(bool): Whether freshness check is muted (optional, default: false)timeliness_number(int): Timeliness check interval number (optional, >0)timeliness_datepart(string): Timeliness check date part (optional, hour, day, week, month, year)mute_timeliness_check(bool): Whether timeliness check is muted (optional, default: false)
Response Fields: -
id(int): Pipeline type IDStatus Codes:
201Created - New pipeline type was created200OK - Existing pipeline type was found500Internal Server Error - Unique constraint violation
List Pipeline Types
- GET /pipeline_type
Get all pipeline types.
Response:
[ { "id": 1, "name": "extraction", "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T12:00:00Z" }, { "id": 2, "name": "transformation", "freshness_number": 48, "freshness_datepart": "hour", "mute_freshness_check": true, "timeliness_number": 4, "timeliness_datepart": "hour", "mute_timeliness_check": false, "created_at": "2024-01-01T00:00:00Z", "updated_at": null } ]
Response Fields:
id(int): Pipeline type IDname(string): Pipeline type name (1-150 characters)freshness_number(int): Freshness check interval numberfreshness_datepart(string): Freshness check date part (hour, day, week, month, year)mute_freshness_check(bool): Whether freshness check is mutedtimeliness_number(int): Timeliness check interval numbertimeliness_datepart(string): Timeliness check date part (hour, day, week, month, year)mute_timeliness_check(bool): Whether timeliness check is mutedcreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Pipeline types retrieved successfully
Get Pipeline Type by ID
- GET /pipeline_type/{pipeline_type_id}
Get a specific pipeline type by ID.
Parameters: -
pipeline_type_id(int): Pipeline type IDResponse:
{ "id": 1, "name": "extraction", "freshness_number": 24, "freshness_datepart": "hour", "mute_freshness_check": false, "timeliness_number": 2, "timeliness_datepart": "hour", "mute_timeliness_check": false, "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T12:00:00Z" }
Response Fields:
id(int): Pipeline type IDname(string): Pipeline type name (1-150 characters)freshness_number(int): Freshness check interval numberfreshness_datepart(string): Freshness check date part (hour, day, week, month, year)mute_freshness_check(bool): Whether freshness check is mutedtimeliness_number(int): Timeliness check interval numbertimeliness_datepart(string): Timeliness check date part (hour, day, week, month, year)mute_timeliness_check(bool): Whether timeliness check is mutedcreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Pipeline type found404Not Found - Pipeline type not found
Address Management
Create or Get Address
- POST /address
Create a new address or get existing one (upsert behavior). Automatically creates address type if it doesn’t exist.
Request Body:
{ "name": "source_db.source_schema.source_table", "address_type_name": "postgres", "address_type_group_name": "database", "database_name": "source_db", "schema_name": "source_schema", "table_name": "source_table", "primary_key": "id", "address_metadata": { "external_dependencies": [ { "type": "looker_dashboard", "name": "Sales Dashboard" } ] } }
Response:
{ "id": 1 }
Request Body Fields:
name(string): Address name (1-150 characters, required)address_type_name(string): Address type name (1-150 characters, required)address_type_group_name(string): Address type group name (1-150 characters, required)database_name(string): Database name (max 50 characters, optional)schema_name(string): Schema name (max 50 characters, optional)table_name(string): Table name (max 50 characters, optional)primary_key(string): Primary key (max 50 characters, optional)address_metadata(object): Arbitrary JSON metadata for external dependencies (optional)
Response Fields:
id(int): Address ID
Status Codes:
201Created - New address was created200OK - Existing address was found
List Addresses
- GET /address
Get all addresses.
Response:
[ { "id": 1, "name": "source_db.source_schema.source_table", "address_type_id": 1, "database_name": "source_db", "schema_name": "source_schema", "table_name": "source_table", "primary_key": "id",
- ,
“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”
}, {
“id”: 2, “name”: “target_db.target_schema.target_table”, “address_type_id”: 1, “database_name”: “target_db”, “schema_name”: “target_schema”, “table_name”: “target_table”, “primary_key”: “id”,
- ,
“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: null
}
]
Response Fields:
id(int): Address IDname(string): Address name (1-150 characters)address_type_id(int): Address type IDdatabase_name(string): Database name (max 50 characters)schema_name(string): Schema name (max 50 characters)table_name(string): Table name (max 50 characters)primary_key(string): Primary key (max 50 characters)created_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Addresses retrieved successfully
Get Address by ID
- GET /address/{address_id}
Get a specific address by ID.
Parameters: -
address_id(int): Address IDResponse:
{ "id": 1, "name": "source_db.source_schema.source_table", "address_type_id": 1, "database_name": "source_db", "schema_name": "source_schema", "table_name": "source_table", "primary_key": "id",
- ,
“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”
}
Response Fields:
id(int): Address IDname(string): Address name (1-150 characters)address_type_id(int): Address type IDdatabase_name(string): Database name (max 50 characters)schema_name(string): Schema name (max 50 characters)table_name(string): Table name (max 50 characters)primary_key(string): Primary key (max 50 characters)created_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Address found404Not Found - Address not found
Update Address
- PATCH /address
Update address information.
Request Body:
{ "id": 1, "name": "updated_table_name", "database_name": "updated_db", "schema_name": "updated_schema", "table_name": "updated_table", "primary_key": "id",
- ,
“address_type_id”: 2
}
Response:
{ "id": 1, "name": "updated_table_name", "address_type_id": 2, "database_name": "updated_db", "schema_name": "updated_schema", "table_name": "updated_table", "primary_key": "id",
- ,
“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”
}
Request Body Fields:
id(int): Address ID (required)name(string): Address name (1-150 characters, optional)address_type_id(int): Address type ID (optional)database_name(string): Database name (max 50 characters, optional)schema_name(string): Schema name (max 50 characters, optional)table_name(string): Table name (max 50 characters, optional)primary_key(string): Primary key (max 50 characters, optional)
(optional)
Response Fields:
id(int): Address IDname(string): Address nameaddress_type_id(int): Address type IDdatabase_name(string): Database nameschema_name(string): Schema nametable_name(string): Table nameprimary_key(string): Primary keycreated_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601)
Status Codes:
200OK - Address updated successfully404Not Found - Address not found
Address Types
Create or Get Address Type
- POST /address_type
Create a new address type or get existing one (upsert behavior).
Request Body:
{ "name": "postgres", "group_name": "database" }
Response:
{ "id": 1 }
Request Body Fields:
name(string): Address type name (1-150 characters, required)group_name(string): Address type group name (1-150 characters, required)
Response Fields:
id(int): Address type ID
Status Codes:
201Created - New address type was created200OK - Existing address type was found
List Address Types
- GET /address_type
Get all address types.
Response:
[ { "id": 1, "name": "postgres", "group_name": "database", "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T00:00:00Z" }, { "id": 2, "name": "mysql", "group_name": "database", "created_at": "2024-01-01T00:00:00Z", "updated_at": null } ]
Response Fields:
id(int): Address type IDname(string): Address type name (1-150 characters)group_name(string): Address type group name (1-150 characters)created_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Address types retrieved successfully
Get Address Type by ID
- GET /address_type/{address_type_id}
Get a specific address type by ID.
Parameters: -
address_type_id(int): Address type IDResponse:
{ "id": 1, "name": "postgres", "group_name": "database", "created_at": "2024-01-01T00:00:00Z", "updated_at": "2024-01-01T00:00:00Z" }
Response Fields:
id(int): Address type IDname(string): Address type name (1-150 characters)group_name(string): Address type group name (1-150 characters)created_at(string): Creation timestamp (ISO 8601)updated_at(string): Last update timestamp (ISO 8601, nullable)
Status Codes:
200OK - Address type found404Not Found - Address type not found
Address Lineage
Create Address Lineage
- POST /address_lineage
Create lineage relationships between addresses. Automatically creates addresses and address types if they don’t exist.
Request Body:
{ "pipeline_id": 1, "source_addresses": [ { "name": "source_db.source_schema.source_table", "address_type_name": "postgres", "address_type_group_name": "database" } ], "target_addresses": [ { "name": "target_db.target_schema.target_table", "address_type_name": "postgres", "address_type_group_name": "database" } ] }
Response:
{ "pipeline_id": 1, "lineage_relationships_created": 1, "message": "Lineage relationships created for pipeline 1" }
Request Body Fields:
pipeline_id(int): Pipeline ID (required)source_addresses(array): List of source addressestarget_addresses(array): List of target addressesname(string): Address name (1-150 characters)address_type_name(string): Address type name (1-150 characters)address_type_group_name(string): Address type group name (1-150 characters)
Response Fields:
pipeline_id(int): Pipeline IDlineage_relationships_created(int): Number of relationships createdmessage(string): Status message
Status Codes:
201Created - Lineage relationships created successfully200OK - Pipeline does not have load_lineage=True, no relationships created
Anomaly Detection
Create or Get Anomaly Detection Rule
- POST /anomaly_detection_rule
Create a new anomaly detection rule or get existing one (upsert behavior).
Request Body:
{ "pipeline_id": 1, "metric_field": "total_rows", "z_threshold": 2.0, "lookback_days": 30, "minimum_executions": 30, "active": true }
Response:
{ "id": 1 }
Parameters:
pipeline_id(int): Pipeline ID (required)metric_field(string): Metric field to monitor (required)z_threshold(float): Z-score threshold 1.0-10.0 (default: 3.0)lookback_days(int): Days of historical data 1-365 (default: 30)minimum_executions(int): Minimum executions 5-1000 (default: 30)active(bool): Whether rule is active (default: true)
Status Codes:
201Created - New rule was created200OK - Existing rule was found
List Anomaly Detection Rules
- GET /anomaly_detection_rule
Get all anomaly detection rules.
Response:
[ { "id": 1, "pipeline_id": 1, "metric_field": "total_rows", "z_threshold": 2.0, "lookback_days": 30, "minimum_executions": 30, "active": true, "created_at": "2024-01-01T10:00:00Z", "updated_at": "2024-01-01T10:05:00Z" }, { "id": 2, "pipeline_id": 1, "metric_field": "duration_seconds", "z_threshold": 3.0, "lookback_days": 30, "minimum_executions": 30, "active": true, "created_at": "2024-01-01T10:00:00Z", "updated_at": null } ]
Get Anomaly Detection Rule by ID
- GET /anomaly_detection_rule/{anomaly_detection_rule_id}
Get a specific anomaly detection rule by ID.
Parameters:
anomaly_detection_rule_id(int): Rule ID
Response:
{ "id": 1, "pipeline_id": 1, "metric_field": "total_rows", "z_threshold": 2.0, "lookback_days": 30, "minimum_executions": 30, "active": true, "created_at": "2024-01-01T10:00:00Z", "updated_at": "2024-01-01T10:05:00Z" }
Update Anomaly Detection Rule
- PATCH /anomaly_detection_rule
Update anomaly detection rule.
Request Body:
{ "id": 1, "pipeline_id": 1, "metric_field": "updates", "z_threshold": 2.5, "lookback_days": 30, "minimum_executions": 20, "active": true }
Response:
{ "id": 1, "pipeline_id": 1, "metric_field": "updates", "z_threshold": 2.5, "lookback_days": 30, "minimum_executions": 20, "active": true, "created_at": "2024-01-01T10:00:00Z", "updated_at": "2024-01-01T10:05:00Z" }
Parameters:
id(int): Rule ID (required)pipeline_id(int): Pipeline ID (optional)metric_field(string): Metric field to monitor (optional)z_threshold(float): Z-score threshold 1.0-10.0 (optional)lookback_days(int): Days of historical data 1-365 (optional)minimum_executions(int): Minimum executions 5-1000 (optional)active(bool): Whether rule is active (optional)
Unflag Anomalies
- POST /unflag_anomaly
Unflag anomalies for a pipeline execution.
Request Body:
{ "pipeline_id": 1, "pipeline_execution_id": 1, "metric_field": ["total_rows", "duration_seconds"] }
Response: HTTP 204 No Content
Parameters:
pipeline_id(int): Pipeline IDpipeline_execution_id(int): Pipeline execution IDmetric_field(array): List of metric fields to unflag
Monitoring & Health
Check Timeliness
- POST /timeliness
Check pipeline execution timeliness.
Request Body:
{ "lookback_minutes": 60 }
Response:
{ "status": "queued" }
Check Freshness
- POST /freshness
Check DML operation freshness.
Response:
{ "status": "queued" }
Log Cleanup
- POST /log_cleanup
Clean up old log data based on retention period.
Request Body:
{ "retention_days": 90, "batch_size": 10000 }
Parameters:
retention_days(int): Number of days to retain data (minimum: 90)batch_size(int): Number of records to delete per batch (default: 10000)
Response:
{ "total_pipeline_executions_deleted": 1000, "total_timeliness_pipeline_execution_logs_deleted": 500, "total_anomaly_detection_results_deleted": 50, "total_pipeline_execution_closure_parent_deleted": 200, "total_pipeline_execution_closure_child_deleted": 200, "total_freshness_pipeline_logs_deleted": 300 }
Celery Queue Monitoring
- POST /celery/monitor-queue
Monitor Celery queue depths and alert if queue gets too big.
Request Body: None
Response:
{ "status": "success" }
Error Response:
{ "status": "error" }
Alert Thresholds:
WARNING: 50+ pending tasks
CRITICAL: 100+ pending tasks