Pydantic Data Models

This section documents the pydantic data models used in Watcher’s API for validation request/response bodies

Pipeline Models

PipelinePostInput

class PipelinePostInput(ValidatorModel):
    name: str = Field(max_length=150, min_length=1)
    pipeline_type_name: str = Field(max_length=150, min_length=1)
    next_watermark: Optional[Union[str, int, DateTime, Date]] = None
    pipeline_metadata: Optional[dict] = None
    freshness_number: Optional[int] = Field(default=None, gt=0)
    freshness_datepart: Optional[DatePartEnum] = None
    timeliness_number: Optional[int] = Field(default=None, gt=0)
    timeliness_datepart: Optional[DatePartEnum] = None

PipelinePostOutput

class PipelinePostOutput(ValidatorModel):
    id: int
    active: bool
    load_lineage: bool
    watermark: Optional[Union[str, int, DateTime, Date]] = None

PipelinePatchInput

class PipelinePatchInput(ValidatorModel):
    id: int
    name: Optional[str] = Field(None, max_length=150, min_length=1)
    pipeline_type_id: Optional[int] = None
    watermark: Optional[Union[str, int, DateTime, Date]] = None
    next_watermark: Optional[Union[str, int, DateTime, Date]] = None
    pipeline_metadata: Optional[dict] = None
    freshness_number: Optional[int] = Field(None, gt=0)
    freshness_datepart: Optional[DatePartEnum] = None
    mute_freshness_check: Optional[bool] = None
    timeliness_number: Optional[int] = Field(None, gt=0)
    timeliness_datepart: Optional[DatePartEnum] = None
    mute_timeliness_check: Optional[bool] = None
    load_lineage: Optional[bool] = None
    active: Optional[bool] = None

Pipeline Type Models

PipelineTypePostInput

class PipelineTypePostInput(ValidatorModel):
    name: str = Field(max_length=150, min_length=1)
    freshness_number: Optional[int] = Field(None, gt=0)
    freshness_datepart: Optional[DatePartEnum] = None
    mute_freshness_check: Optional[bool] = Field(default=False)
    timeliness_number: Optional[int] = Field(None, gt=0)
    timeliness_datepart: Optional[DatePartEnum] = None
    mute_timeliness_check: Optional[bool] = Field(default=False)

PipelineTypePostOutput

class PipelineTypePostOutput(ValidatorModel):
    id: int

PipelineTypePatchInput

class PipelineTypePatchInput(ValidatorModel):
    id: int
    name: Optional[str] = Field(None, max_length=150, min_length=1)
    freshness_number: Optional[int] = Field(None, gt=0)
    freshness_datepart: Optional[DatePartEnum] = None
    mute_freshness_check: Optional[bool] = None
    timeliness_number: Optional[int] = Field(None, gt=0)
    timeliness_datepart: Optional[DatePartEnum] = None
    mute_timeliness_check: Optional[bool] = None

Pipeline Execution Models

PipelineExecutionStartInput

class PipelineExecutionStartInput(ValidatorModel):
    pipeline_id: int
    start_date: DateTime
    watermark: Optional[Union[str, int, DateTime, Date]] = None
    next_watermark: Optional[Union[str, int, DateTime, Date]] = None
    parent_id: Optional[int] = None

PipelineExecutionStartOutput

class PipelineExecutionStartOutput(ValidatorModel):
    id: int

PipelineExecutionEndInput

class PipelineExecutionEndInput(ValidatorModel):
    id: int
    end_date: DateTime
    completed_successfully: bool
    total_rows: Optional[int] = Field(default=None, ge=0)
    inserts: Optional[int] = Field(default=None, ge=0)
    updates: Optional[int] = Field(default=None, ge=0)
    soft_deletes: Optional[int] = Field(default=None, ge=0)
    execution_metadata: Optional[dict] = None

Address Models

AddressPostInput

class AddressPostInput(ValidatorModel):
    name: str = Field(max_length=150, min_length=1)
    address_type_name: str = Field(max_length=150, min_length=1)
    address_type_group_name: str = Field(max_length=150, min_length=1)
    database_name: Optional[str] = Field(None, max_length=50)
    schema_name: Optional[str] = Field(None, max_length=50)
    table_name: Optional[str] = Field(None, max_length=50)
    primary_key: Optional[str] = Field(None, max_length=50)
    address_metadata: Optional[dict] = None

AddressPostOutput

class AddressPostOutput(ValidatorModel):
    id: int

AddressPatchInput

class AddressPatchInput(ValidatorModel):
    id: int
    name: Optional[str] = Field(None, max_length=150, min_length=1)
    address_type_id: Optional[int] = None
    database_name: Optional[str] = Field(None, max_length=50)
    schema_name: Optional[str] = Field(None, max_length=50)
    table_name: Optional[str] = Field(None, max_length=50)
    primary_key: Optional[str] = Field(None, max_length=50)
    address_metadata: Optional[dict] = None

Address Type Models

AddressTypePostInput

class AddressTypePostInput(ValidatorModel):
    name: str = Field(max_length=150, min_length=1)
    group_name: str = Field(max_length=150, min_length=1)

AddressTypePostOutput

class AddressTypePostOutput(ValidatorModel):
    id: int

AddressTypePatchInput

class AddressTypePatchInput(ValidatorModel):
    id: int
    name: Optional[str] = Field(None, max_length=150, min_length=1)
    group_name: Optional[str] = Field(None, max_length=150, min_length=1)

Address Lineage Models

AddressLineagePostInput

class AddressLineagePostInput(ValidatorModel):
    pipeline_id: int
    source_addresses: List[AddressPostInput]
    target_addresses: List[AddressPostInput]

AddressLineagePostOutput

class AddressLineagePostOutput(ValidatorModel):
    pipeline_id: int
    lineage_relationships_created: int
    message: Optional[str] = None

AddressLineageGetOutput

class AddressLineageGetOutput(ValidatorModel):
    id: int
    pipeline_id: int
    source_address_id: int
    target_address_id: int

AddressLineageClosureGetOutput

class AddressLineageClosureGetOutput(ValidatorModel):
    source_address_id: int
    target_address_id: int
    depth: int
    source_address_name: str
    target_address_name: str

Anomaly Detection Models

AnomalyDetectionRulePostInput

class AnomalyDetectionRulePostInput(ValidatorModel):
    pipeline_id: int
    metric_field: AnomalyMetricFieldEnum
    z_threshold: float = Field(gt=0)
    minimum_executions: int = Field(ge=2)

AnomalyDetectionRulePostOutput

class AnomalyDetectionRulePostOutput(ValidatorModel):
    id: int
    pipeline_id: int
    metric_field: AnomalyMetricFieldEnum
    z_threshold: float
    minimum_executions: int
    active: bool
    created_at: DateTime

AnomalyDetectionRulePatchInput

class AnomalyDetectionRulePatchInput(ValidatorModel):
    id: int
    pipeline_id: Optional[int] = None
    metric_field: Optional[AnomalyMetricFieldEnum] = None
    z_threshold: Optional[float] = Field(
        None,
        ge=1.0,
        le=10.0,
        description="How many standard deviations above mean to trigger anomaly",
    )
    lookback_days: Optional[int] = Field(
        None,
        ge=1,
        le=365,
        description="Number of days of historical data to compare against",
    )
    minimum_executions: Optional[int] = Field(
        None,
        ge=5,
        le=1000,
        description="Minimum executions needed for baseline calculation",
    )
    active: Optional[bool] = Field(default=True)

UnflagAnomalyInput

class UnflagAnomalyInput(ValidatorModel):
    pipeline_id: int
    pipeline_execution_id: int
    metric_field: List[AnomalyMetricFieldEnum]

Monitoring Models

FreshnessPostOutput

class FreshnessPostOutput(ValidatorModel):
    status: str

TimelinessPostInput

class TimelinessPostInput(ValidatorModel):
    lookback_minutes: int = Field(ge=5, default=60)

TimelinessPostOutput

class TimelinessPostOutput(ValidatorModel):
    status: str

Log Cleanup Models

LogCleanupPostInput

class LogCleanupPostInput(ValidatorModel):
    retention_days: int = Field(ge=90)
    batch_size: int = 10000

LogCleanupPostOutput

class LogCleanupPostOutput(ValidatorModel):
    total_pipeline_executions_deleted: int = Field(ge=0)
    total_timeliness_pipeline_execution_logs_deleted: int = Field(ge=0)
    total_anomaly_detection_results_deleted: int = Field(ge=0)
    total_pipeline_execution_closure_parent_deleted: int = Field(ge=0)
    total_pipeline_execution_closure_child_deleted: int = Field(ge=0)
    total_freshness_pipeline_logs_deleted: int = Field(ge=0)

Enums

AnomalyMetricFieldEnum

class AnomalyMetricFieldEnum(str, Enum):
    TOTAL_ROWS = "total_rows"
    DURATION_SECONDS = "duration_seconds"
    THROUGHPUT = "throughput"
    INSERTS = "inserts"
    UPDATES = "updates"
    SOFT_DELETES = "soft_deletes"

DatePartEnum

class DatePartEnum(str, Enum):
    HOUR = "hour"
    DAY = "day"
    WEEK = "week"
    MONTH = "month"
    QUARTER = "quarter"
    YEAR = "year"

ValidatorModel

All models inherit from ValidatorModel which provides:

  • Pydantic validation Automatic data validation and type checking

  • String coercion Automatic conversion of various types for watermarks to strings for database storage

  • Case normalization Automatic lowercase conversion for string fields

  • Field validation Built-in validation for field constraints (length, ranges, etc.)

Example:

from src.types import ValidatorModel
from pydantic import Field
from typing import Optional

class MyModel(ValidatorModel):
    name: str = Field(max_length=150, min_length=1)
    value: Optional[int] = Field(None, ge=0)
    created_at: Optional[DateTime] = None