Anomaly Detection

This guide covers how to set up and use anomaly detection in Watcher. Watcher’s anomaly detection system uses statistical analysis to identify unusual patterns in pipeline execution metrics. It automatically runs after each successful pipeline execution and can detect anomalies in various metrics like execution time, row counts, and throughput.

How It Works

Statistical Analysis

The anomaly detection system uses z-score analysis:

  1. Baseline Calculation Analyzes historical execution data

  2. Statistical Metrics Calculates mean and standard deviation

  3. Z-Score Calculation Determines how many standard deviations a value is from the mean

  4. Threshold Comparison Compares z-score against configured threshold

  5. Anomaly Detection Flags values that exceed the threshold

Note

The system only compares successful executions that occurred during the same hour of day (e.g., 2 PM vs 2 PM) to account for daily patterns, business hours, and data processing cycles. This ensures more accurate anomaly detection by comparing like-with-like time periods.

Supported Metrics

Monitor various pipeline execution metrics:

  • total_rows Total number of rows processed

  • duration_seconds Execution duration in seconds

  • throughput Rows processed per second

  • inserts Number of insert operations

  • updates Number of update operations

  • soft_deletes Number of soft delete operations

Setting Up Anomaly Detection

Creating Detection Rules

Note

You can enable automatic creation of anomaly detection rules by setting the WATCHER_AUTO_CREATE_ANOMALY_DETECTION_RULES environment variable to true. This will automatically create all rules for a pipeline when it is created.

  1. Create an anomaly detection rule

    import httpx
    
    response = httpx.post(
        "http://localhost:8000/anomaly_detection_rule",
        json={
            "pipeline_id": 1,
            "metric_field": "total_rows",
            "z_threshold": 3.0,
            "lookback_days": 30,
            "minimum_executions": 30
        }
    )
    print(response.json())
    
  2. Response

    {
      "id": 1,
      "pipeline_id": 1,
      "metric_field": "total_rows",
      "z_threshold": 2.0,
      "lookback_days": 30,
      "minimum_executions": 5,
      "active": true,
      "created_at": "2024-01-01T10:00:00Z",
      "updated_at": null
    }
    

Rule Configuration

Pipeline ID The pipeline to monitor for anomalies

Metric Field The specific metric to analyze:

  • total_rows: Monitor row count variations

  • duration_seconds: Monitor execution time variations

  • throughput: Monitor processing speed variations

  • inserts: Monitor insert operation variations

  • updates: Monitor update operation variations

  • soft_deletes: Monitor soft delete variations

Z-Threshold Sensitivity of anomaly detection:

  • 1.5: Very sensitive (catches minor variations)

  • 2.0: Standard sensitivity (recommended)

  • 2.5: Less sensitive (catches major variations)

  • 3.0: Very conservative (catches only extreme anomalies)

Minimum Executions Number of historical executions needed before analysis:

  • 5: Minimum for basic analysis (would not recommend this)

  • 30: Recommended for stable baselines

Multiple Rules

Create multiple rules for comprehensive monitoring:

import httpx

# Define rules to create
rules = [
    {
        "pipeline_id": 1,
        "metric_field": "total_rows",
        "z_threshold": 2.0,
        "lookback_days": 30,
        "minimum_executions": 5
    },
    {
        "pipeline_id": 1,
        "metric_field": "duration_seconds",
        "z_threshold": 2.5,
        "lookback_days": 30,
        "minimum_executions": 10
    },
    {
        "pipeline_id": 1,
        "metric_field": "throughput",
        "z_threshold": 1.8,
        "lookback_days": 30,
        "minimum_executions": 8
    }
]

# Create all rules
for rule in rules:
    response = httpx.post(
        "http://localhost:8000/anomaly_detection_rule",
        json=rule
    )
    print(f"{rule['metric_field']} rule:", response.json())

Automatic Execution

Triggered Execution

Anomaly detection runs automatically after each successful pipeline execution:

  1. Pipeline execution completes successfully

  2. System checks for active anomaly detection rules

  3. For each rule, analyzes the execution metrics

  4. Compares against historical baseline

  5. Flags anomalies if detected

  6. Sends alerts if anomalies are found

No Manual Triggering Required

Unlike monitoring checks, anomaly detection doesn’t require manual triggering:

  • Automatic: Runs after every successful execution

  • Background: Processed by Celery workers

  • Real-time: Results available immediately

  • Persistent: Anomaly results stored in database

Anomaly Results

Understanding Results

When an anomaly is detected, the system stores detailed information:

{
  "pipeline_execution_id": 123,
  "rule_id": 1,
  "violation_value": 15000.0000,
  "z_score": 2.8000,
  "historical_mean": 8000.0000,
  "std_deviation_value": 2500.0000,
  "z_threshold": 2.0000,
  "threshold_min_value": 3000.0000,
  "threshold_max_value": 13000.0000,
  "context": {
    "lookback_days": 30,
    "minimum_executions": 30,
    "execution_count": 45
  },
  "detected_at": "2024-01-01T10:05:00Z"
}

Result Fields

  • violation_value: The actual metric value that triggered the anomaly

  • z_score: How many standard deviations from the mean

  • historical_mean: Average value from historical data

  • std_deviation_value: Standard deviation from historical data

  • z_threshold: Configured threshold for this rule

  • threshold_min_value: Minimum expected value

  • threshold_max_value: Maximum expected value

Alert Notifications

Slack Alerts

When anomalies are detected, Slack alerts are sent automatically:

⚠️ WARNING
Anomaly Detection
Timestamp: 2025-01-09 20:30:45 UTC
Message: Anomalies Detected for Pipeline 'analytics_pipeline' (ID: 123) - Execution ID 21 flagged

Details:
• Total Anomalies: 2
• Metrics: ['duration_seconds', 'throughput']
• Anomalies:
     • duration_seconds: 4914 (Range: 0 - 4767)
     • throughput: 271.96 (Range: 0 - 250)

Alert Configuration

Configure Slack webhooks for alerts:

# Set Slack webhook URL
SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"

Managing Anomalies

Viewing Anomalies

List all anomaly detection rules:

import httpx

response = httpx.get("http://localhost:8000/anomaly_detection_rule")
print(response.json())

Response:

[
  {
    "id": 1,
    "pipeline_id": 1,
    "metric_field": "total_rows",
    "z_threshold": 2.0,
    "lookback_days": 30,
    "minimum_executions": 5,
    "active": true,
    "created_at": "2024-01-01T10:00:00Z",
    "updated_at": null
  }
]

Get specific rule details:

import httpx

response = httpx.get("http://localhost:8000/anomaly_detection_rule/1")
print(response.json())

Updating Rules

Update anomaly detection rules:

Response: Returns the complete updated rule (full AnomalyDetectionRule model)

import httpx

response = httpx.patch(
    "http://localhost:8000/anomaly_detection_rule",
    json={
        "id": 1,
        "z_threshold": 2.5,
        "lookback_days": 30,
        "minimum_executions": 10
    }
)
print(response.json())

Response Example:

{
  "id": 1,
  "pipeline_id": 1,
  "metric_field": "total_rows",
  "z_threshold": 2.5,
  "lookback_days": 30,
  "minimum_executions": 10,
  "active": true,
  "created_at": "2024-01-01T10:00:00Z",
  "updated_at": "2024-01-01T11:30:00Z"
}

Unflagging Anomalies

Unflag anomalies that are false positives:

import httpx

response = httpx.post(
    "http://localhost:8000/unflag_anomaly",
    json={
        "pipeline_id": 1,
        "pipeline_execution_id": 123,
        "metric_field": ["total_rows", "duration_seconds"]
    }
)
print(response.status_code)

This removes the anomaly flags and allows the execution to be included in future baseline calculations.

Adjusting Thresholds

The system provides detailed statistical data to help you fine-tune anomaly detection sensitivity. Each anomaly result includes both the z-score (actual deviation) and z-threshold (sensitivity setting) to guide adjustments.

Understanding the Data

Z-Score vs Z-Threshold:

  • Z-Score: How many standard deviations the current value is from the historical mean

  • Z-Threshold: Your sensitivity setting (e.g., 2.0 = flag anything beyond 2 standard deviations)

  • Relationship: If z_score > z_threshold → Anomaly detected

Tuning Process

You are given the pipeline execution id in the alert message. You can utilize this to query the anomaly_detection_result table:

SELECT
/* Current Values */
adr.pipeline_execution_id,
rule.metric_field,
adr.violation_value,
adr.historical_mean,
adr.std_deviation_value,
adr.z_threshold,
adr.threshold_min_value,
adr.threshold_max_value,

/* Z-Score for analysis */
adr.z_score,
FLOOR(0, adr.historical_mean - (adr.std_deviation_value * adr.z_score)) AS new_threshold_min_value,
adr.historical_mean + (adr.std_deviation_value * adr.z_score) AS new_threshold_max_value
FROM public.anomaly_detection_result AS adr
INNER JOIN public.anomaly_detection_rule AS rule
    ON rule.id = adr.rule_id
WHERE pipeline_execution_id = 12  /* Grab from Alert */
    AND rule.metric_field = 'DURATION_SECONDS'  /* Grab from Alert */

This gives you information around how close the violation_value was to the threshold and what a new threshold would look like if adjusted to the violation value and its z_score. This gives you an idea of how to adjust the z_threshold to mitigate false positives.

Best Practices

Rule Configuration

  • Start Conservative: Begin with higher z-thresholds (2.5-3.0)

  • Adjust Based on Data: Lower thresholds as you understand your data patterns

  • Multiple Metrics: Monitor different aspects of pipeline performance

  • Sufficient History: Ensure enough historical data for stable baselines

Threshold Selection

A good rule of thumb is to start with a z-threshold of 3.0 and a minimum executions of 30. According to the Central Limit Theorem, 30 executions is enough to get a stable baseline for normal distrbution.

Monitoring Strategy

  • Regular Review: Review anomaly results regularly

  • False Positive Management: Unflag false positives promptly

  • Threshold Tuning: Adjust thresholds based on results

  • Alert Fatigue: Avoid overly sensitive thresholds

Common Scenarios

Data Volume Anomalies

Detect unusual data volumes:

{
  "pipeline_id": 1,
  "metric_field": "total_rows",
  "z_threshold": 3.0,
  "minimum_executions": 30
}

Performance Anomalies

Detect execution time issues:

{
  "pipeline_id": 1,
  "metric_field": "duration_seconds",
  "z_threshold": 3.0,
  "minimum_executions": 30
}

Throughput Anomalies

Detect processing speed issues:

{
  "pipeline_id": 1,
  "metric_field": "throughput",
  "z_threshold": 3.0,
  "minimum_executions": 30
}

DML Operation Anomalies

Detect unusual insert/update patterns:

{
  "pipeline_id": 1,
  "metric_field": "inserts",
  "z_threshold": 3.0,
  "minimum_executions": 30
}

Advanced Configuration

Auto-Creation Rules

Enable automatic rule creation for new pipelines:

WATCHER_AUTO_CREATE_ANOMALY_DETECTION_RULES=true

This automatically creates default anomaly detection rules for new pipelines.