Database Schema
This section documents the complete database schema for Watcher.
Core Tables
Pipeline
Primary pipeline configuration table.
CREATE TABLE pipeline (
id SERIAL PRIMARY KEY,
name VARCHAR(150) NOT NULL,
pipeline_type_id INTEGER NOT NULL REFERENCES pipeline_type(id),
watermark VARCHAR(50) NULL,
next_watermark VARCHAR(50) NULL,
pipeline_metadata JSONB NULL,
last_target_insert TIMESTAMP WITH TIME ZONE NULL,
last_target_update TIMESTAMP WITH TIME ZONE NULL,
last_target_soft_delete TIMESTAMP WITH TIME ZONE NULL,
freshness_number INTEGER NULL,
freshness_datepart VARCHAR(20) NULL,
mute_freshness_check BOOLEAN DEFAULT FALSE NOT NULL,
timeliness_number INTEGER NULL,
timeliness_datepart VARCHAR(20) NULL,
mute_timeliness_check BOOLEAN DEFAULT FALSE NOT NULL,
load_lineage BOOLEAN DEFAULT TRUE NOT NULL,
active BOOLEAN DEFAULT TRUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NULL
);
-- Indexes
CREATE UNIQUE INDEX ux_pipeline_name_include ON pipeline (name) INCLUDE (load_lineage, active, id);
CREATE INDEX ix_pipeline_pipeline_type_id_include ON pipeline (pipeline_type_id) INCLUDE (id);
Pipeline Type
Pipeline type classification.
CREATE TABLE pipeline_type (
id SERIAL PRIMARY KEY,
name VARCHAR(150) NOT NULL,
freshness_number INTEGER NULL,
freshness_datepart VARCHAR(20) NULL,
mute_freshness_check BOOLEAN DEFAULT FALSE NOT NULL,
timeliness_number INTEGER NULL,
timeliness_datepart VARCHAR(20) NULL,
mute_timeliness_check BOOLEAN DEFAULT FALSE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NULL
);
-- Indexes
CREATE UNIQUE INDEX ux_pipeline_type_name_include ON pipeline_type (name) INCLUDE (id);
Pipeline Execution
Pipeline execution tracking.
CREATE TABLE pipeline_execution (
id BIGINT PRIMARY KEY,
parent_id BIGINT NULL REFERENCES pipeline_execution(id),
pipeline_id INTEGER NOT NULL REFERENCES pipeline(id),
start_date TIMESTAMP WITH TIME ZONE NOT NULL,
date_recorded DATE NOT NULL,
hour_recorded INTEGER NOT NULL,
end_date TIMESTAMP WITH TIME ZONE NULL,
duration_seconds INTEGER NULL,
completed_successfully BOOLEAN NULL,
inserts INTEGER NULL CHECK (inserts >= 0),
updates INTEGER NULL CHECK (updates >= 0),
soft_deletes INTEGER NULL CHECK (soft_deletes >= 0),
total_rows INTEGER NULL CHECK (total_rows >= 0),
watermark VARCHAR(50) NULL,
next_watermark VARCHAR(50) NULL,
execution_metadata JSONB NULL,
anomaly_flags JSONB NULL,
throughput DECIMAL(12,4) NULL,
-- Constraints
CONSTRAINT check_end_after_start CHECK (end_date IS NULL OR end_date > start_date),
CONSTRAINT check_parent_not_self CHECK (parent_id IS NULL OR parent_id != id)
);
-- Indexes
CREATE INDEX ix_pipeline_execution_start_date ON pipeline_execution (start_date) INCLUDE (id);
CREATE INDEX ix_pipeline_execution_hour_recorded ON pipeline_execution (pipeline_id, hour_recorded, end_date)
INCLUDE (completed_successfully, id) WHERE end_date IS NOT NULL;
CREATE INDEX ix_pipeline_execution_date_recorded_seek ON pipeline_execution (date_recorded, pipeline_id) INCLUDE (id);
Pipeline Execution Closure
Hierarchical pipeline execution relationships.
CREATE TABLE pipeline_execution_closure (
parent_execution_id BIGINT NOT NULL,
child_execution_id BIGINT NOT NULL,
depth INTEGER NOT NULL,
PRIMARY KEY (parent_execution_id, child_execution_id),
FOREIGN KEY (parent_execution_id) REFERENCES pipeline_execution(id),
FOREIGN KEY (child_execution_id) REFERENCES pipeline_execution(id)
);
-- Indexes
CREATE INDEX ix_pipeline_execution_closure_depth_parent_include ON pipeline_execution_closure (parent_execution_id, depth) INCLUDE (child_execution_id);
CREATE INDEX ix_pipeline_execution_closure_depth_child_include ON pipeline_execution_closure (child_execution_id, depth) INCLUDE (parent_execution_id);
Address
Data address tracking.
CREATE TABLE address (
id SERIAL PRIMARY KEY,
name VARCHAR(150) NOT NULL,
address_type_id INTEGER NOT NULL REFERENCES address_type(id),
database_name VARCHAR(50) NULL,
schema_name VARCHAR(50) NULL,
table_name VARCHAR(50) NULL,
primary_key VARCHAR(50) NULL,
address_metadata JSONB NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NULL
);
-- Indexes
CREATE UNIQUE INDEX ux_address_name_include ON address (name) INCLUDE (id);
Address Type
Address type classification.
CREATE TABLE address_type (
id SERIAL PRIMARY KEY,
name VARCHAR(150) NOT NULL,
group_name VARCHAR(150) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NULL
);
-- Indexes
CREATE UNIQUE INDEX ux_address_type_name_include ON address_type (name) INCLUDE (id);
Address Lineage
Data lineage relationships.
CREATE TABLE address_lineage (
id BIGINT PRIMARY KEY,
pipeline_id INTEGER NOT NULL REFERENCES pipeline(id),
source_address_id INTEGER NOT NULL REFERENCES address(id),
target_address_id INTEGER NOT NULL REFERENCES address(id)
);
-- Indexes
CREATE UNIQUE INDEX ux_address_lineage_source_target ON address_lineage (source_address_id, target_address_id);
CREATE UNIQUE INDEX ux_address_lineage_target_source ON address_lineage (target_address_id, source_address_id);
CREATE INDEX ix_address_lineage_pipeline_id ON address_lineage (pipeline_id);
Address Lineage Closure
Transitive closure of address lineage relationships.
CREATE TABLE address_lineage_closure (
source_address_id INTEGER NOT NULL,
target_address_id INTEGER NOT NULL,
depth INTEGER NOT NULL,
lineage_path INTEGER[] NOT NULL,
PRIMARY KEY (source_address_id, target_address_id),
FOREIGN KEY (source_address_id) REFERENCES address(id),
FOREIGN KEY (target_address_id) REFERENCES address(id)
);
-- Indexes
CREATE INDEX ix_address_lineage_closure_depth_source_include ON address_lineage_closure (source_address_id, depth) INCLUDE (target_address_id);
CREATE INDEX ix_address_lineage_closure_depth_target_include ON address_lineage_closure (target_address_id, depth) INCLUDE (source_address_id);
Monitoring Tables
Timeliness Pipeline Execution Log
Timeliness monitoring results.
CREATE TABLE timeliness_pipeline_execution_log (
pipeline_execution_id BIGINT PRIMARY KEY,
pipeline_id INTEGER NOT NULL REFERENCES pipeline(id),
duration_seconds INTEGER NOT NULL,
seconds_threshold INTEGER NOT NULL,
execution_status VARCHAR(50) NOT NULL,
timely_number INTEGER NOT NULL,
timely_datepart VARCHAR(20) NOT NULL,
used_child_config BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
FOREIGN KEY (pipeline_execution_id) REFERENCES pipeline_execution(id)
);
Freshness Pipeline Log
Freshness monitoring results.
CREATE TABLE freshness_pipeline_log (
id BIGINT PRIMARY KEY,
pipeline_id INTEGER NOT NULL REFERENCES pipeline(id),
last_dml_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
evaluation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
freshness_number INTEGER NOT NULL,
freshness_datepart VARCHAR(20) NOT NULL,
used_child_config BOOLEAN NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL
);
-- Indexes
CREATE UNIQUE INDEX ux_freshness_pipeline_log ON freshness_pipeline_log (last_dml_timestamp, pipeline_id);
Anomaly Detection
Anomaly Detection Rule
Anomaly detection configuration.
CREATE TABLE anomaly_detection_rule (
id SERIAL PRIMARY KEY,
pipeline_id INTEGER NOT NULL REFERENCES pipeline(id),
metric_field VARCHAR(50) NOT NULL,
z_threshold DECIMAL(4,2) DEFAULT 3.0 NOT NULL,
lookback_days INTEGER DEFAULT 30 NOT NULL,
minimum_executions INTEGER DEFAULT 30 NOT NULL,
active BOOLEAN DEFAULT TRUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NULL
);
-- Indexes
CREATE INDEX ix_anomaly_detection_rule_pipeline_id_include ON anomaly_detection_rule (pipeline_id, active) INCLUDE (id);
CREATE UNIQUE INDEX ux_anomaly_detection_rule_composite_key_include ON anomaly_detection_rule (pipeline_id, metric_field) INCLUDE (id);
Anomaly Detection Result
Anomaly detection results.
CREATE TABLE anomaly_detection_result (
pipeline_execution_id BIGINT NOT NULL,
rule_id INTEGER NOT NULL REFERENCES anomaly_detection_rule(id),
violation_value DECIMAL(12,4) NOT NULL,
z_score DECIMAL(12,4) NOT NULL,
historical_mean DECIMAL(12,4) NOT NULL,
std_deviation_value DECIMAL(12,4) NOT NULL,
z_threshold DECIMAL(12,4) NOT NULL,
threshold_min_value DECIMAL(12,4) NOT NULL,
threshold_max_value DECIMAL(12,4) NOT NULL,
context JSONB NULL,
detected_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
PRIMARY KEY (pipeline_execution_id, rule_id),
FOREIGN KEY (pipeline_execution_id) REFERENCES pipeline_execution(id)
);
Data Relationships
Hierarchical Relationships
Pipeline Execution Hierarchy
Parent-child relationships via parent_id
Closure table for efficient parent/child queries
Depth tracking for relationship levels
Address Lineage Hierarchy
Source-target relationships via source_address_id and target_address_id
Closure table for transitive relationships
Depth tracking for lineage levels
Many-to-Many Relationships
Pipeline-Address Relationships
Pipelines can have multiple addresses
Addresses can be used by multiple pipelines
Junction table: address_lineage
Pipeline-Execution Relationships
Pipelines can have multiple executions
Executions belong to one pipeline
Foreign key: pipeline_id