Celery Tasks

This section documents all Celery background tasks in Watcher. Watcher uses Celery for distributed background task processing, providing reliable execution of monitoring checks, anomaly detection, and data processing tasks.

Task Types

Regular Tasks

These tasks are triggered by API calls or other events and run in the main celery queue.

detect_anomalies_task

Purpose Statistical analysis of pipeline execution patterns

Rate Limit 15/s

Parameters

  • pipeline_id (int): Pipeline ID to analyze

  • pipeline_execution_id (int): Execution ID to analyze

Description Automatically triggered after each successful pipeline execution. Performs statistical analysis on pipeline metrics to detect anomalies using z-score analysis.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Example

from src.celery_tasks import detect_anomalies_task

# Trigger anomaly detection
detect_anomalies_task.delay(pipeline_id=1, pipeline_execution_id=123)

freshness_check_task

Purpose DML operation monitoring and data staleness detection

Rate Limit 1/s

Parameters None

Description Monitors data modification operations (inserts, updates, soft deletes) to detect stale data. Checks when data was last modified and alerts if it’s older than expected.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Example

from src.celery_tasks import freshness_check_task

# Trigger freshness check
freshness_check_task.delay()

timeliness_check_task

Purpose Pipeline execution timing validation

Rate Limit 1/s

Parameters

  • lookback_minutes (int): How far back to look for executions (default: 60)

Description Validates that pipeline executions are completing within expected timeframes. Compares actual execution times against configured timeliness thresholds.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Example

from src.celery_tasks import timeliness_check_task

# Trigger timeliness check
timeliness_check_task.delay(lookback_minutes=120)

address_lineage_closure_rebuild_task

Purpose Rebuild address lineage closure table relationships

Rate Limit 5/s

Parameters

  • connected_addresses (List[int]): List of address IDs to rebuild

  • pipeline_id (int): Pipeline ID for context

Description Maintains the closure table for address lineage relationships. Rebuilds the transitive closure when new lineage relationships are created.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Example

from src.celery_tasks import address_lineage_closure_rebuild_task

# Rebuild closure table
address_lineage_closure_rebuild_task.delay(
    connected_addresses=[1, 2, 3],
    pipeline_id=1
)

pipeline_execution_closure_maintain_task

Purpose Maintain pipeline execution hierarchy closure table

Rate Limit - No rate limit (Needs to keep up with pipeline execution creation rate)

Parameters

  • execution_id (int): Execution ID to maintain

  • parent_id (int, optional): Parent execution ID

Description Maintains the closure table for pipeline execution hierarchies. Automatically triggered when new pipeline executions are created to track parent-child relationships.

Retry Policy

  • Max retries: 3

  • Retry delay: 30 seconds

  • Exponential backoff

Example

from src.celery_tasks import pipeline_execution_closure_maintain_task

# Maintain execution hierarchy
pipeline_execution_closure_maintain_task.delay(
    execution_id=123,
    parent_id=122
)

Scheduled Tasks

These tasks are automatically scheduled by Celery Beat and run in the scheduled queue. They provide automated monitoring and maintenance without manual intervention.

scheduled_freshness_check

Purpose Automated freshness monitoring for all active pipelines

Queue scheduled

Schedule Configurable via WATCHER_FRESHNESS_CHECK_SCHEDULE (default: 0 * * * * - hourly)

Parameters None

Description Automatically triggers freshness checks for all active pipelines on a configurable schedule. Delegates to the regular freshness_check_task.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Configuration

Set the schedule using environment variable:

WATCHER_FRESHNESS_CHECK_SCHEDULE="0 * * * *"  # Every hour

scheduled_timeliness_check

Purpose Automated timeliness monitoring for all active pipelines

Queue scheduled

Schedule Configurable via WATCHER_TIMELINESS_CHECK_SCHEDULE (default: */15 * * * * - every 15 minutes)

Parameters None (uses WATCHER_TIMELINESS_CHECK_LOOKBACK_MINUTES for lookback)

Description Automatically triggers timeliness checks for all active pipelines on a configurable schedule. Delegates to the regular timeliness_check_task with configured lookback period.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Configuration

Set the schedule and lookback period:

WATCHER_TIMELINESS_CHECK_SCHEDULE="*/15 * * * *"  # Every 15 minutes
WATCHER_TIMELINESS_CHECK_LOOKBACK_MINUTES=60      # Look back 60 minutes

scheduled_celery_queue_health_check

Purpose Automated Celery queue health monitoring and alerting

Queue scheduled

Schedule Configurable via WATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE (default: */5 * * * * - every 5 minutes)

Parameters None

Description Automatically monitors Celery queue health, including task counts, queue depths, and scheduled tasks. Sends Slack alerts when thresholds are exceeded. Delegates to the check_celery_queue function.

Retry Policy

  • Max retries: 3

  • Retry delay: 60 seconds

  • Exponential backoff

Configuration

Set the schedule:

WATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE="*/5 * * * *"  # Every 5 minutes

Task Configuration

Queue Management

Watcher uses two separate Celery queues for task isolation:

  • celery - Main queue for regular tasks triggered by API calls

  • scheduled - Dedicated queue for automated scheduled tasks

This separation prevents scheduled monitoring tasks from being delayed by high-volume regular tasks.

Rate Limiting

Regular tasks have configurable rate limits to prevent system overload:

  • detect_anomalies_task 15/s (high frequency for real-time analysis)

  • freshness_check_task 1/s (low frequency for periodic checks)

  • timeliness_check_task 1/s (low frequency for periodic checks)

  • address_lineage_closure_rebuild_task 5/s (medium frequency for maintenance)

  • pipeline_execution_closure_maintain_task No rate limit (must keep up with execution rate)

Scheduled tasks have no rate limits as they are controlled by Celery Beat scheduling.

Celery Beat Configuration

Celery Beat is configured with the following default schedules:

celery.conf.beat_schedule = {
    "scheduled-freshness-check": {
        "task": "src.celery_tasks.scheduled_freshness_check",
        "schedule": crontab(minute=0),  # Every hour
    },
    "scheduled-timeliness-check": {
        "task": "src.celery_tasks.scheduled_timeliness_check",
        "schedule": crontab(minute="*/15"),  # Every 15 minutes
    },
    "scheduled-celery-queue-health-check": {
        "task": "src.celery_tasks.scheduled_celery_queue_health_check",
        "schedule": crontab(minute="*/5"),  # Every 5 minutes
    },
}

Schedules are configurable via environment variables:

  • WATCHER_FRESHNESS_CHECK_SCHEDULE

  • WATCHER_TIMELINESS_CHECK_SCHEDULE

  • WATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE

Retry Policies

All tasks implement retry policies with exponential backoff:

  • Max Retries 3 attempts

  • Base Delay 30-60 seconds depending on task type

  • Exponential Backoff Delay increases with each retry

  • Final Failure Task marked as failed after max retries

Error Handling

Tasks include comprehensive error handling:

  • Database Errors Connection issues, constraint violations

  • Validation Errors Invalid input parameters

  • Timeout Errors Long-running operations

  • System Errors Memory, disk, or network issues

Task Monitoring

Task Status Tracking

Each task provides status updates during execution:

  • PENDING Task queued, waiting for worker

  • PROGRESS Task running, with progress updates

  • SUCCESS Task completed successfully

  • FAILURE Task failed with error details

  • RETRY Task failed, will retry

Progress Updates

Tasks provide detailed progress information:

{
  "state": "PROGRESS",
  "meta": {
    "status": "Starting anomaly detection..."
  }
}

Error Details

Failed tasks include comprehensive error information:

{
  "state": "FAILURE",
  "meta": {
    "exc_type": "DatabaseError",
    "exc_message": "Connection timeout",
    "retry_count": 2,
    "max_retries": 3
  }
}