Celery Tasks
This section documents all Celery background tasks in Watcher. Watcher uses Celery for distributed background task processing, providing reliable execution of monitoring checks, anomaly detection, and data processing tasks.
Task Types
Regular Tasks
These tasks are triggered by API calls or other events and run in the main celery queue.
detect_anomalies_task
Purpose Statistical analysis of pipeline execution patterns
Rate Limit 15/s
Parameters
pipeline_id(int): Pipeline ID to analyzepipeline_execution_id(int): Execution ID to analyze
Description Automatically triggered after each successful pipeline execution. Performs statistical analysis on pipeline metrics to detect anomalies using z-score analysis.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Example
from src.celery_tasks import detect_anomalies_task
# Trigger anomaly detection
detect_anomalies_task.delay(pipeline_id=1, pipeline_execution_id=123)
freshness_check_task
Purpose DML operation monitoring and data staleness detection
Rate Limit 1/s
Parameters None
Description Monitors data modification operations (inserts, updates, soft deletes) to detect stale data. Checks when data was last modified and alerts if it’s older than expected.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Example
from src.celery_tasks import freshness_check_task
# Trigger freshness check
freshness_check_task.delay()
timeliness_check_task
Purpose Pipeline execution timing validation
Rate Limit 1/s
Parameters
lookback_minutes(int): How far back to look for executions (default: 60)
Description Validates that pipeline executions are completing within expected timeframes. Compares actual execution times against configured timeliness thresholds.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Example
from src.celery_tasks import timeliness_check_task
# Trigger timeliness check
timeliness_check_task.delay(lookback_minutes=120)
address_lineage_closure_rebuild_task
Purpose Rebuild address lineage closure table relationships
Rate Limit 5/s
Parameters
connected_addresses(List[int]): List of address IDs to rebuildpipeline_id(int): Pipeline ID for context
Description Maintains the closure table for address lineage relationships. Rebuilds the transitive closure when new lineage relationships are created.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Example
from src.celery_tasks import address_lineage_closure_rebuild_task
# Rebuild closure table
address_lineage_closure_rebuild_task.delay(
connected_addresses=[1, 2, 3],
pipeline_id=1
)
pipeline_execution_closure_maintain_task
Purpose Maintain pipeline execution hierarchy closure table
Rate Limit - No rate limit (Needs to keep up with pipeline execution creation rate)
Parameters
execution_id(int): Execution ID to maintainparent_id(int, optional): Parent execution ID
Description Maintains the closure table for pipeline execution hierarchies. Automatically triggered when new pipeline executions are created to track parent-child relationships.
Retry Policy
Max retries: 3
Retry delay: 30 seconds
Exponential backoff
Example
from src.celery_tasks import pipeline_execution_closure_maintain_task
# Maintain execution hierarchy
pipeline_execution_closure_maintain_task.delay(
execution_id=123,
parent_id=122
)
Scheduled Tasks
These tasks are automatically scheduled by Celery Beat and run in the scheduled queue. They provide automated monitoring and maintenance without manual intervention.
scheduled_freshness_check
Purpose Automated freshness monitoring for all active pipelines
Queue scheduled
Schedule Configurable via WATCHER_FRESHNESS_CHECK_SCHEDULE (default: 0 * * * * - hourly)
Parameters None
Description
Automatically triggers freshness checks for all active pipelines on a configurable schedule. Delegates to the regular freshness_check_task.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Configuration
Set the schedule using environment variable:
WATCHER_FRESHNESS_CHECK_SCHEDULE="0 * * * *" # Every hour
scheduled_timeliness_check
Purpose Automated timeliness monitoring for all active pipelines
Queue scheduled
Schedule Configurable via WATCHER_TIMELINESS_CHECK_SCHEDULE (default: */15 * * * * - every 15 minutes)
Parameters None (uses WATCHER_TIMELINESS_CHECK_LOOKBACK_MINUTES for lookback)
Description
Automatically triggers timeliness checks for all active pipelines on a configurable schedule. Delegates to the regular timeliness_check_task with configured lookback period.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Configuration
Set the schedule and lookback period:
WATCHER_TIMELINESS_CHECK_SCHEDULE="*/15 * * * *" # Every 15 minutes
WATCHER_TIMELINESS_CHECK_LOOKBACK_MINUTES=60 # Look back 60 minutes
scheduled_celery_queue_health_check
Purpose Automated Celery queue health monitoring and alerting
Queue scheduled
Schedule Configurable via WATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE (default: */5 * * * * - every 5 minutes)
Parameters None
Description
Automatically monitors Celery queue health, including task counts, queue depths, and scheduled tasks. Sends Slack alerts when thresholds are exceeded. Delegates to the check_celery_queue function.
Retry Policy
Max retries: 3
Retry delay: 60 seconds
Exponential backoff
Configuration
Set the schedule:
WATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE="*/5 * * * *" # Every 5 minutes
Task Configuration
Queue Management
Watcher uses two separate Celery queues for task isolation:
celery - Main queue for regular tasks triggered by API calls
scheduled - Dedicated queue for automated scheduled tasks
This separation prevents scheduled monitoring tasks from being delayed by high-volume regular tasks.
Rate Limiting
Regular tasks have configurable rate limits to prevent system overload:
detect_anomalies_task 15/s (high frequency for real-time analysis)
freshness_check_task 1/s (low frequency for periodic checks)
timeliness_check_task 1/s (low frequency for periodic checks)
address_lineage_closure_rebuild_task 5/s (medium frequency for maintenance)
pipeline_execution_closure_maintain_task No rate limit (must keep up with execution rate)
Scheduled tasks have no rate limits as they are controlled by Celery Beat scheduling.
Celery Beat Configuration
Celery Beat is configured with the following default schedules:
celery.conf.beat_schedule = {
"scheduled-freshness-check": {
"task": "src.celery_tasks.scheduled_freshness_check",
"schedule": crontab(minute=0), # Every hour
},
"scheduled-timeliness-check": {
"task": "src.celery_tasks.scheduled_timeliness_check",
"schedule": crontab(minute="*/15"), # Every 15 minutes
},
"scheduled-celery-queue-health-check": {
"task": "src.celery_tasks.scheduled_celery_queue_health_check",
"schedule": crontab(minute="*/5"), # Every 5 minutes
},
}
Schedules are configurable via environment variables:
WATCHER_FRESHNESS_CHECK_SCHEDULEWATCHER_TIMELINESS_CHECK_SCHEDULEWATCHER_CELERY_QUEUE_HEALTH_CHECK_SCHEDULE
Retry Policies
All tasks implement retry policies with exponential backoff:
Max Retries 3 attempts
Base Delay 30-60 seconds depending on task type
Exponential Backoff Delay increases with each retry
Final Failure Task marked as failed after max retries
Error Handling
Tasks include comprehensive error handling:
Database Errors Connection issues, constraint violations
Validation Errors Invalid input parameters
Timeout Errors Long-running operations
System Errors Memory, disk, or network issues
Task Monitoring
Task Status Tracking
Each task provides status updates during execution:
PENDING Task queued, waiting for worker
PROGRESS Task running, with progress updates
SUCCESS Task completed successfully
FAILURE Task failed with error details
RETRY Task failed, will retry
Progress Updates
Tasks provide detailed progress information:
{
"state": "PROGRESS",
"meta": {
"status": "Starting anomaly detection..."
}
}
Error Details
Failed tasks include comprehensive error information:
{
"state": "FAILURE",
"meta": {
"exc_type": "DatabaseError",
"exc_message": "Connection timeout",
"retry_count": 2,
"max_retries": 3
}
}