This document describes the current stable version of django_celery_monitor (1.1). For development docs, go here.
Source code for django_celery_monitor.camera
"""The Celery events camera."""
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from celery import states
from celery.events.snapshot import Polaroid
from celery.utils.imports import symbol_by_name
from celery.utils.log import get_logger
from celery.utils.time import maybe_iso8601
from .utils import fromtimestamp, correct_awareness
WORKER_UPDATE_FREQ = 60 # limit worker timestamp write freq.
SUCCESS_STATES = frozenset([states.SUCCESS])
NOT_SAVED_ATTRIBUTES = frozenset(['name', 'args', 'kwargs', 'eta'])
logger = get_logger(__name__)
debug = logger.debug
[docs]class Camera(Polaroid):
"""The Celery events Polaroid snapshot camera."""
clear_after = True
worker_update_freq = WORKER_UPDATE_FREQ
def __init__(self, *args, **kwargs):
super(Camera, self).__init__(*args, **kwargs)
# Expiry can be timedelta or None for never expire.
self.app.add_defaults({
'monitors_expire_success': timedelta(days=1),
'monitors_expire_error': timedelta(days=3),
'monitors_expire_pending': timedelta(days=5),
})
@property
def TaskState(self):
"""Return the data model to store task state in."""
return symbol_by_name('django_celery_monitor.models.TaskState')
@property
def WorkerState(self):
"""Return the data model to store worker state in."""
return symbol_by_name('django_celery_monitor.models.WorkerState')
def django_setup(self):
import django
django.setup()
def install(self):
super(Camera, self).install()
self.django_setup()
@property
def expire_task_states(self):
"""Return a twople of Celery task states and expiration timedeltas."""
return (
(SUCCESS_STATES, self.app.conf.monitors_expire_success),
(states.EXCEPTION_STATES, self.app.conf.monitors_expire_error),
(states.UNREADY_STATES, self.app.conf.monitors_expire_pending),
)
def get_heartbeat(self, worker):
try:
heartbeat = worker.heartbeats[-1]
except IndexError:
return
return fromtimestamp(heartbeat)
def handle_worker(self, hostname_worker):
hostname, worker = hostname_worker
return self.WorkerState.objects.update_heartbeat(
hostname,
heartbeat=self.get_heartbeat(worker),
update_freq=self.worker_update_freq,
)
[docs] def handle_task(self, uuid_task, worker=None):
"""Handle snapshotted event."""
uuid, task = uuid_task
if task.worker and task.worker.hostname:
worker = self.handle_worker(
(task.worker.hostname, task.worker),
)
defaults = {
'name': task.name,
'args': task.args,
'kwargs': task.kwargs,
'eta': correct_awareness(maybe_iso8601(task.eta)),
'expires': correct_awareness(maybe_iso8601(task.expires)),
'state': task.state,
'tstamp': fromtimestamp(task.timestamp),
'result': task.result or task.exception,
'traceback': task.traceback,
'runtime': task.runtime,
'worker': worker
}
# Some fields are only stored in the RECEIVED event,
# so we should remove these from default values,
# so that they are not overwritten by subsequent states.
[defaults.pop(attr, None) for attr in NOT_SAVED_ATTRIBUTES
if defaults[attr] is None]
return self.update_task(task.state, task_id=uuid, defaults=defaults)
def update_task(self, state, task_id, defaults=None):
defaults = defaults or {}
if not defaults.get('name'):
return
return self.TaskState.objects.update_state(
state=state,
task_id=task_id,
defaults=defaults,
)
def on_shutter(self, state):
def _handle_tasks():
for i, task in enumerate(state.tasks.items()):
self.handle_task(task)
for worker in state.workers.items():
self.handle_worker(worker)
_handle_tasks()
def on_cleanup(self):
expired = (
self.TaskState.objects.expire_by_states(states, expires)
for states, expires in self.expire_task_states
)
dirty = sum(item for item in expired if item is not None)
if dirty:
debug('Cleanup: Marked %s objects as dirty.', dirty)
self.TaskState.objects.purge()
debug('Cleanup: %s objects purged.', dirty)
return dirty
return 0