This document describes the current stable version of django_celery_monitor (1.1). For development docs, go here.
Source code for django_celery_monitor.models
"""The data models for the task and worker states."""
from __future__ import absolute_import, unicode_literals
from time import time, mktime, gmtime
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from celery import states
from celery.events.state import heartbeat_expires
from celery.five import python_2_unicode_compatible
from . import managers
ALL_STATES = sorted(states.ALL_STATES)
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))
[docs]@python_2_unicode_compatible
class WorkerState(models.Model):
"""The data model to store the worker state in."""
#: The hostname of the Celery worker.
hostname = models.CharField(_('hostname'), max_length=255, unique=True)
#: A :class:`~datetime.datetime` describing when the worker was last seen.
last_heartbeat = models.DateTimeField(_('last heartbeat'), null=True,
db_index=True)
last_update = models.DateTimeField(_('last update'), auto_now=True)
#: A :class:`~django_celery_monitor.managers.ExtendedManager` instance
#: to query the :class:`~django_celery_monitor.models.WorkerState` model.
objects = managers.WorkerStateQuerySet.as_manager()
class Meta:
"""Model meta-data."""
verbose_name = _('worker')
verbose_name_plural = _('workers')
get_latest_by = 'last_heartbeat'
ordering = ['-last_heartbeat']
def __str__(self):
return self.hostname
def __repr__(self):
return '<WorkerState: {0.hostname}>'.format(self)
[docs] def is_alive(self):
"""Return whether the worker is currently alive or not."""
if self.last_heartbeat:
# Use UTC timestamp if USE_TZ is true, or else use local timestamp
timestamp = mktime(gmtime()) if settings.USE_TZ else time()
return timestamp < heartbeat_expires(self.heartbeat_timestamp)
return False
@property
def heartbeat_timestamp(self):
return mktime(self.last_heartbeat.timetuple())
[docs]@python_2_unicode_compatible
class TaskState(models.Model):
"""The data model to store the task state in."""
#: The :mod:`task state <celery.states>` as returned by Celery.
state = models.CharField(
_('state'), max_length=64, choices=TASK_STATE_CHOICES, db_index=True,
)
#: The task :func:`UUID <uuid.uuid4>`.
task_id = models.CharField(_('UUID'), max_length=36, unique=True)
#: The :ref:`task name <celery:task-names>`.
name = models.CharField(
_('name'), max_length=200, null=True, db_index=True,
)
#: A :class:`~datetime.datetime` describing when the task was received.
tstamp = models.DateTimeField(_('event received at'), db_index=True)
#: The positional :ref:`task arguments <celery:calling-basics>`.
args = models.TextField(_('Arguments'), null=True)
#: The keyword :ref:`task arguments <celery:calling-basics>`.
kwargs = models.TextField(_('Keyword arguments'), null=True)
#: An optional :class:`~datetime.datetime` describing the
#: :ref:`ETA <celery:calling-eta>` for its processing.
eta = models.DateTimeField(_('ETA'), null=True)
#: An optional :class:`~datetime.datetime` describing when the task
#: :ref:`expires <celery:calling-expiration>`.
expires = models.DateTimeField(_('expires'), null=True)
#: The result of the task.
result = models.TextField(_('result'), null=True)
#: The Python error traceback if raised.
traceback = models.TextField(_('traceback'), null=True)
#: The task runtime in seconds.
runtime = models.FloatField(
_('execution time'), null=True,
help_text=_('in seconds if task succeeded'),
)
#: The number of retries.
retries = models.IntegerField(_('number of retries'), default=0)
#: The worker responsible for the execution of the task.
worker = models.ForeignKey(
WorkerState, null=True, verbose_name=_('worker'),
on_delete=models.CASCADE,
)
#: Whether the task has been expired and will be purged by the
#: event framework.
hidden = models.BooleanField(editable=False, default=False, db_index=True)
#: A :class:`~django_celery_monitor.managers.TaskStateManager` instance
#: to query the :class:`~django_celery_monitor.models.TaskState` model.
objects = managers.TaskStateQuerySet.as_manager()
class Meta:
"""Model meta-data."""
verbose_name = _('task')
verbose_name_plural = _('tasks')
get_latest_by = 'tstamp'
ordering = ['-tstamp']
def __str__(self):
name = self.name or 'UNKNOWN'
s = '{0.state:<10} {0.task_id:<36} {1}'.format(self, name)
if self.eta:
s += ' eta:{0.eta}'.format(self)
return s
def __repr__(self):
return '<TaskState: {0.state} {1}[{0.task_id}] ts:{0.tstamp}>'.format(
self, self.name or 'UNKNOWN',
)