API Endpoints

This section documents all available API endpoints in Watcher.

Pipeline Management

Create or Get Pipeline

POST /pipeline

Create a new pipeline or get existing one (upsert behavior). Automatically creates pipeline type if it doesn’t exist.

Request Body:

{
  "name": "my data pipeline",
  "pipeline_type_name": "extraction",
  "next_watermark": "2024-01-02T00:00:00Z",
  "pipeline_metadata": {
    "description": "Daily data extraction pipeline"
  },
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "timeliness_number": 2,
  "timeliness_datepart": "hour"
}

Response:

{
  "id": 1,
  "active": true,
  "load_lineage": true,
  "watermark": "2024-01-01T00:00:00Z"
}

Request Body Fields:

  • name (string): Pipeline name (1-150 characters, required)

  • pipeline_type_name (string): Pipeline type name (1-150 characters, required)

  • next_watermark (string|int|datetime|date): Next watermark value (optional)

  • pipeline_metadata (object): Additional pipeline metadata (optional)

  • freshness_number (int): Freshness check interval number (optional, >0)

  • freshness_datepart (string): Freshness check date part (optional, hour, day, week, month, year)

  • timeliness_number (int): Timeliness check interval number (optional, >0)

  • timeliness_datepart (string): Timeliness check date part (optional, hour, day, week, month, year)

Response Fields:

  • id (int): Pipeline ID

  • active (bool): Whether pipeline is active

  • load_lineage (bool): Whether to load lineage

  • watermark (string|int|datetime|date): Current watermark value (returned when next_watermark is provided)

Status Codes:

  • 201 Created - New pipeline was created

  • 200 OK - Existing pipeline was found

  • 500 Internal Server Error - Unique constraint violation

List Pipelines

GET /pipeline

Get all pipelines.

Response:

[
  {
    "id": 1,
    "name": "my data pipeline",
    "pipeline_type_id": 1,
    "watermark": "2024-01-01T00:00:00Z",
    "next_watermark": "2024-01-01T23:59:59Z",
    "pipeline_metadata": {
      "environment": "production"
    },
    "last_target_insert": "2024-01-01T10:00:00Z",
    "last_target_update": "2024-01-01T10:00:00Z",
    "last_target_soft_delete": null,
    "freshness_number": 24,
    "freshness_datepart": "hour",
    "mute_freshness_check": false,
    "timeliness_number": 2,
    "timeliness_datepart": "hour",
    "mute_timeliness_check": false,
    "load_lineage": true,
    "active": true,
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": "2024-01-01T12:00:00Z"
  },
  {
    "id": 2,
    "name": "another pipeline",
    "pipeline_type_id": 2,
    "watermark": null,
    "next_watermark": null,
    "pipeline_metadata": null,
    "last_target_insert": null,
    "last_target_update": null,
    "last_target_soft_delete": null,
    "freshness_number": 48,
    "freshness_datepart": "hour",
    "mute_freshness_check": true,
    "timeliness_number": 4,
    "timeliness_datepart": "hour",
    "mute_timeliness_check": false,
    "load_lineage": false,
    "active": true,
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": null
  }
]

Response Fields:

  • id (int): Pipeline ID

  • name (string): Pipeline name (1-150 characters)

  • pipeline_type_id (int): Pipeline type ID

  • watermark (string): Watermark value (max 50 characters, nullable)

  • next_watermark (string): Next watermark value (max 50 characters, nullable)

  • pipeline_metadata (object): Pipeline metadata (JSONB, nullable)

  • last_target_insert (string): Last target insert timestamp (ISO 8601, nullable)

  • last_target_update (string): Last target update timestamp (ISO 8601, nullable)

  • last_target_soft_delete (string): Last target soft delete timestamp (ISO 8601, nullable)

  • freshness_number (int): Freshness check interval number (nullable)

  • freshness_datepart (string): Freshness check date part (hour, day, week, month, year, nullable)

  • mute_freshness_check (bool): Whether freshness check is muted

  • timeliness_number (int): Timeliness check interval number (nullable)

  • timeliness_datepart (string): Timeliness check date part (hour, day, week, month, year, nullable)

  • mute_timeliness_check (bool): Whether timeliness check is muted

  • load_lineage (bool): Whether to load lineage

  • active (bool): Whether pipeline is active

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Pipelines retrieved successfully

Get Pipeline by ID

GET /pipeline/{pipeline_id}

Get a specific pipeline by ID.

Parameters: - pipeline_id (int): Pipeline ID

Response:

{
  "id": 1,
  "name": "my data pipeline",
  "pipeline_type_id": 1,
  "watermark": "2024-01-01T00:00:00Z",
  "next_watermark": "2024-01-01T23:59:59Z",
  "pipeline_metadata": {
    "environment": "production"
  },
  "last_target_insert": "2024-01-01T10:00:00Z",
  "last_target_update": "2024-01-01T10:00:00Z",
  "last_target_soft_delete": null,
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "mute_freshness_check": false,
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
  "mute_timeliness_check": false,
  "load_lineage": true,
  "active": true,
  "created_at": "2024-01-01T00:00:00Z",
  "updated_at": "2024-01-01T12:00:00Z"
}

Response Fields:

  • id (int): Pipeline ID

  • name (string): Pipeline name (1-150 characters)

  • pipeline_type_id (int): Pipeline type ID

  • watermark (string): Watermark value (max 50 characters)

  • next_watermark (string): Next watermark value (max 50 characters)

  • pipeline_metadata (object): Pipeline metadata (JSONB)

  • last_target_insert (string): Last target insert timestamp (ISO 8601, nullable)

  • last_target_update (string): Last target update timestamp (ISO 8601, nullable)

  • last_target_soft_delete (string): Last target soft delete timestamp (ISO 8601, nullable)

  • freshness_number (int): Freshness check interval number

  • freshness_datepart (string): Freshness check date part (hour, day, week, month, year)

  • mute_freshness_check (bool): Whether freshness check is muted

  • timeliness_number (int): Timeliness check interval number

  • timeliness_datepart (string): Timeliness check date part (hour, day, week, month, year)

  • mute_timeliness_check (bool): Whether timeliness check is muted

  • load_lineage (bool): Whether to load lineage

  • active (bool): Whether pipeline is active

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Pipeline found

  • 404 Not Found - Pipeline not found

Update Pipeline

PATCH /pipeline

Update pipeline configuration.

Request Body:

{
  "id": 1,
  "name": "updated pipeline name",
  "pipeline_type_id": 2,
  "watermark": "2024-01-01T00:00:00Z",
  "next_watermark": "2024-01-02T00:00:00Z",
  "pipeline_metadata": {
    "environment": "production"
  },
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "mute_freshness_check": false,
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
  "mute_timeliness_check": false,
  "load_lineage": true
}

Response:

{
  "id": 1,
  "name": "updated pipeline name",
  "pipeline_type_id": 2,
  "watermark": "2024-01-01T00:00:00Z",
  "next_watermark": "2024-01-02T00:00:00Z",
  "pipeline_metadata": {
    "environment": "production"
  },
  "last_target_insert": "2024-01-01T10:00:00Z",
  "last_target_update": "2024-01-01T10:00:00Z",
  "last_target_soft_delete": null,
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "mute_freshness_check": false,
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
  "mute_timeliness_check": false,
  "load_lineage": true,
  "active": true,
  "created_at": "2024-01-01T00:00:00Z",
  "updated_at": "2024-01-01T12:00:00Z"
}

Request Body Fields:

  • id (int): Pipeline ID (required)

  • name (string): Pipeline name (1-150 characters, optional)

  • pipeline_type_id (int): Pipeline type ID (optional)

  • watermark (string|int|datetime|date): Watermark value (optional)

  • next_watermark (string|int|datetime|date): Next watermark value (optional)

  • pipeline_metadata (object): Additional pipeline metadata (optional)

  • freshness_number (int): Freshness check interval number (optional, >0)

  • freshness_datepart (string): Freshness check date part (optional, hour, day, week, month, year)

  • mute_freshness_check (bool): Whether freshness check is muted (optional)

  • timeliness_number (int): Timeliness check interval number (optional, >0)

  • timeliness_datepart (string): Timeliness check date part (optional, hour, day, week, month, year)

  • mute_timeliness_check (bool): Whether timeliness check is muted (optional)

  • load_lineage (bool): Whether to load lineage (optional)

Response Fields:

  • id (int): Pipeline ID

  • name (string): Pipeline name

  • pipeline_type_id (int): Pipeline type ID

  • watermark (string): Watermark value

  • next_watermark (string): Next watermark value

  • pipeline_metadata (object): Pipeline metadata

  • last_target_insert (string): Last target insert timestamp (ISO 8601, nullable)

  • last_target_update (string): Last target update timestamp (ISO 8601, nullable)

  • last_target_soft_delete (string): Last target soft delete timestamp (ISO 8601, nullable)

  • freshness_number (int): Freshness check interval number

  • freshness_datepart (string): Freshness check date part

  • mute_freshness_check (bool): Whether freshness check is muted

  • timeliness_number (int): Timeliness check interval number

  • timeliness_datepart (string): Timeliness check date part

  • mute_timeliness_check (bool): Whether timeliness check is muted

  • load_lineage (bool): Whether to load lineage

  • active (bool): Whether pipeline is active

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601)

Status Codes:

  • 200 OK - Pipeline updated successfully

  • 404 Not Found - Pipeline not found

Pipeline Execution

Start Pipeline Execution

POST /start_pipeline_execution

Start a new pipeline execution. Automatically calculates hour_recorded and date_recorded. If start_date is not provided, it defaults to the current time using pendulum.now().

Request Body:

{
  "pipeline_id": 1,
  "start_date": "2024-01-01T10:00:00Z",
  "watermark": "2024-01-01T00:00:00Z",
  "next_watermark": "2024-01-01T23:59:59Z",
  "parent_id": null
}

Minimal Request Body (start_date omitted, defaults to current time):

{
  "pipeline_id": 1
}

Response:

{
  "id": 1
}

Request Body Fields:

  • pipeline_id (int): Pipeline ID (required)

  • start_date (string): Start timestamp (ISO 8601, optional). If not provided, defaults to the current time using pendulum.now().

  • watermark (string|int|datetime|date): Watermark value (optional)

  • next_watermark (string|int|datetime|date): Next watermark value (optional)

  • parent_id (int): Parent execution ID for hierarchical executions (optional)

Response Fields:

  • id (int): Pipeline execution ID

Status Codes:

  • 201 Created - Pipeline execution started successfully

End Pipeline Execution

POST /end_pipeline_execution

End a pipeline execution with metrics. Automatically calculates duration and throughput.

Request Body:

{
  "id": 1,
  "end_date": "2024-01-01T10:05:00Z",
  "completed_successfully": true,
  "total_rows": 1000,
  "inserts": 800,
  "updates": 200,
  "soft_deletes": 0,
  "execution_metadata": {
    "partition": "2025-01-05"
  }
}

Response: HTTP 204 No Content

Request Body Fields:

  • id (int): Pipeline execution ID (required)

  • end_date (string): End timestamp (ISO 8601, required)

  • completed_successfully (bool): Whether execution completed successfully (optional)

  • total_rows (int): Total rows processed (optional, ≥0)

  • inserts (int): Number of inserts (optional, ≥0)

  • updates (int): Number of updates (optional, ≥0)

  • soft_deletes (int): Number of soft deletes (optional, ≥0)

  • execution_metadata (object): Additional execution metadata (optional)

Status Codes:

  • 204 No Content - Pipeline execution ended successfully

  • 400 Bad Request - end_date must be greater than start_date

  • 404 Not Found - Pipeline execution not found

  • 500 Internal Server Error - Database integrity error

Get Pipeline Execution

GET /pipeline_execution/{pipeline_execution_id}

Get a specific pipeline execution with hierarchical child executions using the closure table.

Parameters: - pipeline_execution_id (int): Pipeline execution ID

Response:

{
  "id": 1,
  "parent_id": null,
  "pipeline_id": 1,
  "start_date": "2024-01-01T10:00:00Z",
  "end_date": "2024-01-01T10:05:00Z",
  "duration_seconds": 300,
  "completed_successfully": true,
  "inserts": 800,
  "updates": 200,
  "soft_deletes": 0,
  "total_rows": 1000,
  "watermark": "2024-01-01T00:00:00Z",
  "next_watermark": "2024-01-01T23:59:59Z",
  "execution_metadata": {
    "partition": "2025-01-05"
  },
  "anomaly_flags": {
    "total_rows": true,
    "duration_seconds": false
  },
  "throughput": 3.33,
  "child_executions": [
    {
      "id": 2,
      "parent_id": 1,
      "pipeline_id": 1,
      "start_date": "2024-01-01T10:01:00Z",
      "end_date": "2024-01-01T10:03:00Z",
      "duration_seconds": 120,
      "completed_successfully": true,
      "inserts": 400,
      "updates": 100,
      "soft_deletes": 0,
      "total_rows": 500,
      "watermark": "2024-01-01T00:00:00Z",
      "next_watermark": "2024-01-01T23:59:59Z",
      "execution_metadata": null,
      "anomaly_flags": null,
      "throughput": 4.17,
      "child_executions": []
    }
  ]
}

Response Fields:

  • id (int): Pipeline execution ID

  • parent_id (int): Parent execution ID (nullable)

  • pipeline_id (int): Pipeline ID

  • start_date (string): Start timestamp (ISO 8601)

  • end_date (string): End timestamp (ISO 8601, nullable)

  • duration_seconds (int): Execution duration in seconds (nullable)

  • completed_successfully (bool): Whether execution completed successfully (nullable)

  • inserts (int): Number of inserts (nullable)

  • updates (int): Number of updates (nullable)

  • soft_deletes (int): Number of soft deletes (nullable)

  • total_rows (int): Total rows processed (nullable)

  • watermark (string): Watermark value (nullable)

  • next_watermark (string): Next watermark value (nullable)

  • execution_metadata (object): Additional execution metadata (nullable)

  • anomaly_flags (object): Anomaly detection flags (nullable)

  • throughput (float): Rows per second throughput (nullable)

  • child_executions (array): Array of child execution objects (nullable)

Status Codes:

  • 200 OK - Pipeline execution found

  • 404 Not Found - Pipeline execution not found

Pipeline Types

Create or Get Pipeline Type

POST /pipeline_type

Create a new pipeline type or get existing one (upsert behavior).

Request Body:

{
  "name": "extraction",
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "mute_freshness_check": false,
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
  "mute_timeliness_check": false
}

Response:

{
  "id": 1
}

Request Body Fields:

  • name (string): Pipeline type name (1-150 characters, required)

  • freshness_number (int): Freshness check interval number (optional, >0)

  • freshness_datepart (string): Freshness check date part (optional, hour, day, week, month, year)

  • mute_freshness_check (bool): Whether freshness check is muted (optional, default: false)

  • timeliness_number (int): Timeliness check interval number (optional, >0)

  • timeliness_datepart (string): Timeliness check date part (optional, hour, day, week, month, year)

  • mute_timeliness_check (bool): Whether timeliness check is muted (optional, default: false)

Response Fields: - id (int): Pipeline type ID

Status Codes:

  • 201 Created - New pipeline type was created

  • 200 OK - Existing pipeline type was found

  • 500 Internal Server Error - Unique constraint violation

List Pipeline Types

GET /pipeline_type

Get all pipeline types.

Response:

[
  {
    "id": 1,
    "name": "extraction",
    "freshness_number": 24,
    "freshness_datepart": "hour",
    "mute_freshness_check": false,
    "timeliness_number": 2,
    "timeliness_datepart": "hour",
    "mute_timeliness_check": false,
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": "2024-01-01T12:00:00Z"
  },
  {
    "id": 2,
    "name": "transformation",
    "freshness_number": 48,
    "freshness_datepart": "hour",
    "mute_freshness_check": true,
    "timeliness_number": 4,
    "timeliness_datepart": "hour",
    "mute_timeliness_check": false,
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": null
  }
]

Response Fields:

  • id (int): Pipeline type ID

  • name (string): Pipeline type name (1-150 characters)

  • freshness_number (int): Freshness check interval number

  • freshness_datepart (string): Freshness check date part (hour, day, week, month, year)

  • mute_freshness_check (bool): Whether freshness check is muted

  • timeliness_number (int): Timeliness check interval number

  • timeliness_datepart (string): Timeliness check date part (hour, day, week, month, year)

  • mute_timeliness_check (bool): Whether timeliness check is muted

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Pipeline types retrieved successfully

Get Pipeline Type by ID

GET /pipeline_type/{pipeline_type_id}

Get a specific pipeline type by ID.

Parameters: - pipeline_type_id (int): Pipeline type ID

Response:

{
  "id": 1,
  "name": "extraction",
  "freshness_number": 24,
  "freshness_datepart": "hour",
  "mute_freshness_check": false,
  "timeliness_number": 2,
  "timeliness_datepart": "hour",
  "mute_timeliness_check": false,
  "created_at": "2024-01-01T00:00:00Z",
  "updated_at": "2024-01-01T12:00:00Z"
}

Response Fields:

  • id (int): Pipeline type ID

  • name (string): Pipeline type name (1-150 characters)

  • freshness_number (int): Freshness check interval number

  • freshness_datepart (string): Freshness check date part (hour, day, week, month, year)

  • mute_freshness_check (bool): Whether freshness check is muted

  • timeliness_number (int): Timeliness check interval number

  • timeliness_datepart (string): Timeliness check date part (hour, day, week, month, year)

  • mute_timeliness_check (bool): Whether timeliness check is muted

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Pipeline type found

  • 404 Not Found - Pipeline type not found

Address Management

Create or Get Address

POST /address

Create a new address or get existing one (upsert behavior). Automatically creates address type if it doesn’t exist.

Request Body:

{
  "name": "source_db.source_schema.source_table",
  "address_type_name": "postgres",
  "address_type_group_name": "database",
  "database_name": "source_db",
  "schema_name": "source_schema",
  "table_name": "source_table",
  "primary_key": "id",
  "address_metadata": {
    "external_dependencies": [
      {
        "type": "looker_dashboard",
        "name": "Sales Dashboard"
      }
    ]
  }
}

Response:

{
  "id": 1
}

Request Body Fields:

  • name (string): Address name (1-150 characters, required)

  • address_type_name (string): Address type name (1-150 characters, required)

  • address_type_group_name (string): Address type group name (1-150 characters, required)

  • database_name (string): Database name (max 50 characters, optional)

  • schema_name (string): Schema name (max 50 characters, optional)

  • table_name (string): Table name (max 50 characters, optional)

  • primary_key (string): Primary key (max 50 characters, optional)

  • address_metadata (object): Arbitrary JSON metadata for external dependencies (optional)

Response Fields:

  • id (int): Address ID

Status Codes:

  • 201 Created - New address was created

  • 200 OK - Existing address was found

List Addresses

GET /address

Get all addresses.

Response:

[
  {
    "id": 1,
    "name": "source_db.source_schema.source_table",
    "address_type_id": 1,
    "database_name": "source_db",
    "schema_name": "source_schema",
    "table_name": "source_table",
    "primary_key": "id",
,

“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”

}, {

“id”: 2, “name”: “target_db.target_schema.target_table”, “address_type_id”: 1, “database_name”: “target_db”, “schema_name”: “target_schema”, “table_name”: “target_table”, “primary_key”: “id”,

,

“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: null

}

]

Response Fields:

  • id (int): Address ID

  • name (string): Address name (1-150 characters)

  • address_type_id (int): Address type ID

  • database_name (string): Database name (max 50 characters)

  • schema_name (string): Schema name (max 50 characters)

  • table_name (string): Table name (max 50 characters)

  • primary_key (string): Primary key (max 50 characters)

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Addresses retrieved successfully

Get Address by ID

GET /address/{address_id}

Get a specific address by ID.

Parameters: - address_id (int): Address ID

Response:

{
  "id": 1,
  "name": "source_db.source_schema.source_table",
  "address_type_id": 1,
  "database_name": "source_db",
  "schema_name": "source_schema",
  "table_name": "source_table",
  "primary_key": "id",
,

“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”

}

Response Fields:

  • id (int): Address ID

  • name (string): Address name (1-150 characters)

  • address_type_id (int): Address type ID

  • database_name (string): Database name (max 50 characters)

  • schema_name (string): Schema name (max 50 characters)

  • table_name (string): Table name (max 50 characters)

  • primary_key (string): Primary key (max 50 characters)

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Address found

  • 404 Not Found - Address not found

Update Address

PATCH /address

Update address information.

Request Body:

{
  "id": 1,
  "name": "updated_table_name",
  "database_name": "updated_db",
  "schema_name": "updated_schema",
  "table_name": "updated_table",
  "primary_key": "id",
,

“address_type_id”: 2

}

Response:

{
  "id": 1,
  "name": "updated_table_name",
  "address_type_id": 2,
  "database_name": "updated_db",
  "schema_name": "updated_schema",
  "table_name": "updated_table",
  "primary_key": "id",
,

“created_at”: “2024-01-01T00:00:00Z”, “updated_at”: “2024-01-01T12:00:00Z”

}

Request Body Fields:

  • id (int): Address ID (required)

  • name (string): Address name (1-150 characters, optional)

  • address_type_id (int): Address type ID (optional)

  • database_name (string): Database name (max 50 characters, optional)

  • schema_name (string): Schema name (max 50 characters, optional)

  • table_name (string): Table name (max 50 characters, optional)

  • primary_key (string): Primary key (max 50 characters, optional)

(optional)

Response Fields:

  • id (int): Address ID

  • name (string): Address name

  • address_type_id (int): Address type ID

  • database_name (string): Database name

  • schema_name (string): Schema name

  • table_name (string): Table name

  • primary_key (string): Primary key

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601)

Status Codes:

  • 200 OK - Address updated successfully

  • 404 Not Found - Address not found

Address Types

Create or Get Address Type

POST /address_type

Create a new address type or get existing one (upsert behavior).

Request Body:

{
  "name": "postgres",
  "group_name": "database"
}

Response:

{
  "id": 1
}

Request Body Fields:

  • name (string): Address type name (1-150 characters, required)

  • group_name (string): Address type group name (1-150 characters, required)

Response Fields:

  • id (int): Address type ID

Status Codes:

  • 201 Created - New address type was created

  • 200 OK - Existing address type was found

List Address Types

GET /address_type

Get all address types.

Response:

[
  {
    "id": 1,
    "name": "postgres",
    "group_name": "database",
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": "2024-01-01T00:00:00Z"
  },
  {
    "id": 2,
    "name": "mysql",
    "group_name": "database",
    "created_at": "2024-01-01T00:00:00Z",
    "updated_at": null
  }
]

Response Fields:

  • id (int): Address type ID

  • name (string): Address type name (1-150 characters)

  • group_name (string): Address type group name (1-150 characters)

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Address types retrieved successfully

Get Address Type by ID

GET /address_type/{address_type_id}

Get a specific address type by ID.

Parameters: - address_type_id (int): Address type ID

Response:

{
  "id": 1,
  "name": "postgres",
  "group_name": "database",
  "created_at": "2024-01-01T00:00:00Z",
  "updated_at": "2024-01-01T00:00:00Z"
}

Response Fields:

  • id (int): Address type ID

  • name (string): Address type name (1-150 characters)

  • group_name (string): Address type group name (1-150 characters)

  • created_at (string): Creation timestamp (ISO 8601)

  • updated_at (string): Last update timestamp (ISO 8601, nullable)

Status Codes:

  • 200 OK - Address type found

  • 404 Not Found - Address type not found

Address Lineage

Create Address Lineage

POST /address_lineage

Create lineage relationships between addresses. Automatically creates addresses and address types if they don’t exist.

Request Body:

{
  "pipeline_id": 1,
  "source_addresses": [
    {
      "name": "source_db.source_schema.source_table",
      "address_type_name": "postgres",
      "address_type_group_name": "database"
    }
  ],
  "target_addresses": [
    {
      "name": "target_db.target_schema.target_table",
      "address_type_name": "postgres",
      "address_type_group_name": "database"
    }
  ]
}

Response:

{
  "pipeline_id": 1,
  "lineage_relationships_created": 1,
  "message": "Lineage relationships created for pipeline 1"
}

Request Body Fields:

  • pipeline_id (int): Pipeline ID (required)

  • source_addresses (array): List of source addresses

  • target_addresses (array): List of target addresses

  • name (string): Address name (1-150 characters)

  • address_type_name (string): Address type name (1-150 characters)

  • address_type_group_name (string): Address type group name (1-150 characters)

Response Fields:

  • pipeline_id (int): Pipeline ID

  • lineage_relationships_created (int): Number of relationships created

  • message (string): Status message

Status Codes:

  • 201 Created - Lineage relationships created successfully

  • 200 OK - Pipeline does not have load_lineage=True, no relationships created

Anomaly Detection

Create or Get Anomaly Detection Rule

POST /anomaly_detection_rule

Create a new anomaly detection rule or get existing one (upsert behavior).

Request Body:

{
  "pipeline_id": 1,
  "metric_field": "total_rows",
  "z_threshold": 2.0,
  "lookback_days": 30,
  "minimum_executions": 30,
  "active": true
}

Response:

{
  "id": 1
}

Parameters:

  • pipeline_id (int): Pipeline ID (required)

  • metric_field (string): Metric field to monitor (required)

  • z_threshold (float): Z-score threshold 1.0-10.0 (default: 3.0)

  • lookback_days (int): Days of historical data 1-365 (default: 30)

  • minimum_executions (int): Minimum executions 5-1000 (default: 30)

  • active (bool): Whether rule is active (default: true)

Status Codes:

  • 201 Created - New rule was created

  • 200 OK - Existing rule was found

List Anomaly Detection Rules

GET /anomaly_detection_rule

Get all anomaly detection rules.

Response:

[
  {
    "id": 1,
    "pipeline_id": 1,
    "metric_field": "total_rows",
    "z_threshold": 2.0,
    "lookback_days": 30,
    "minimum_executions": 30,
    "active": true,
    "created_at": "2024-01-01T10:00:00Z",
    "updated_at": "2024-01-01T10:05:00Z"
  },
  {
    "id": 2,
    "pipeline_id": 1,
    "metric_field": "duration_seconds",
    "z_threshold": 3.0,
    "lookback_days": 30,
    "minimum_executions": 30,
    "active": true,
    "created_at": "2024-01-01T10:00:00Z",
    "updated_at": null
  }
]

Get Anomaly Detection Rule by ID

GET /anomaly_detection_rule/{anomaly_detection_rule_id}

Get a specific anomaly detection rule by ID.

Parameters:

  • anomaly_detection_rule_id (int): Rule ID

Response:

{
  "id": 1,
  "pipeline_id": 1,
  "metric_field": "total_rows",
  "z_threshold": 2.0,
  "lookback_days": 30,
  "minimum_executions": 30,
  "active": true,
  "created_at": "2024-01-01T10:00:00Z",
  "updated_at": "2024-01-01T10:05:00Z"
}

Update Anomaly Detection Rule

PATCH /anomaly_detection_rule

Update anomaly detection rule.

Request Body:

{
  "id": 1,
  "pipeline_id": 1,
  "metric_field": "updates",
  "z_threshold": 2.5,
  "lookback_days": 30,
  "minimum_executions": 20,
  "active": true
}

Response:

{
  "id": 1,
  "pipeline_id": 1,
  "metric_field": "updates",
  "z_threshold": 2.5,
  "lookback_days": 30,
  "minimum_executions": 20,
  "active": true,
  "created_at": "2024-01-01T10:00:00Z",
  "updated_at": "2024-01-01T10:05:00Z"
}

Parameters:

  • id (int): Rule ID (required)

  • pipeline_id (int): Pipeline ID (optional)

  • metric_field (string): Metric field to monitor (optional)

  • z_threshold (float): Z-score threshold 1.0-10.0 (optional)

  • lookback_days (int): Days of historical data 1-365 (optional)

  • minimum_executions (int): Minimum executions 5-1000 (optional)

  • active (bool): Whether rule is active (optional)

Unflag Anomalies

POST /unflag_anomaly

Unflag anomalies for a pipeline execution.

Request Body:

{
  "pipeline_id": 1,
  "pipeline_execution_id": 1,
  "metric_field": ["total_rows", "duration_seconds"]
}

Response: HTTP 204 No Content

Parameters:

  • pipeline_id (int): Pipeline ID

  • pipeline_execution_id (int): Pipeline execution ID

  • metric_field (array): List of metric fields to unflag

Monitoring & Health

Check Timeliness

POST /timeliness

Check pipeline execution timeliness.

Request Body:

{
  "lookback_minutes": 60
}

Response:

{
  "status": "queued"
}

Check Freshness

POST /freshness

Check DML operation freshness.

Response:

{
  "status": "queued"
}

Log Cleanup

POST /log_cleanup

Clean up old log data based on retention period.

Request Body:

{
  "retention_days": 90,
  "batch_size": 10000
}

Parameters:

  • retention_days (int): Number of days to retain data (minimum: 90)

  • batch_size (int): Number of records to delete per batch (default: 10000)

Response:

{
  "total_pipeline_executions_deleted": 1000,
  "total_timeliness_pipeline_execution_logs_deleted": 500,
  "total_anomaly_detection_results_deleted": 50,
  "total_pipeline_execution_closure_parent_deleted": 200,
  "total_pipeline_execution_closure_child_deleted": 200,
  "total_freshness_pipeline_logs_deleted": 300
}

Celery Queue Monitoring

POST /celery/monitor-queue

Monitor Celery queue depths and alert if queue gets too big.

Request Body: None

Response:

{
  "status": "success"
}

Error Response:

{
  "status": "error"
}

Alert Thresholds:

  • WARNING: 50+ pending tasks

  • CRITICAL: 100+ pending tasks