This document describes the current stable version of django_celery_monitor (1.1). For development docs, go here.
Source code for django_celery_monitor.managers
"""The model managers."""
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from celery import states
from celery.events.state import Task
from celery.utils.time import maybe_timedelta
from django.db import models, router, transaction
from .utils import Now
[docs]class ExtendedQuerySet(models.QuerySet):
"""A custom model queryset that implements a few helpful methods."""
[docs] def select_for_update_or_create(self, defaults=None, **kwargs):
"""Extend update_or_create with select_for_update.
Look up an object with the given kwargs, updating one with defaults
if it exists, otherwise create a new one.
Return a tuple (object, created), where created is a boolean
specifying whether an object was created.
This is a backport from Django 1.11
(https://code.djangoproject.com/ticket/26804) to support
select_for_update when getting the object.
"""
defaults = defaults or {}
lookup, params = self._extract_model_params(defaults, **kwargs)
self._for_write = True
with transaction.atomic(using=self.db):
try:
obj = self.select_for_update().get(**lookup)
except self.model.DoesNotExist:
obj, created = self._create_object_from_params(lookup, params)
if created:
return obj, created
for k, v in defaults.items():
setattr(obj, k, v() if callable(v) else v)
obj.save(using=self.db)
return obj, False
[docs]class WorkerStateQuerySet(ExtendedQuerySet):
"""A custom model queryset for the WorkerState model with some helpers."""
def update_heartbeat(self, hostname, heartbeat, update_freq):
with transaction.atomic():
# check if there was an update in the last n seconds?
interval = Now() - timedelta(seconds=update_freq)
recent_worker_updates = self.filter(
hostname=hostname,
last_update__gte=interval,
)
if recent_worker_updates.exists():
# if yes, get the latest update and move on
obj = recent_worker_updates.get()
else:
# if no, update the worker state and move on
obj, _ = self.select_for_update_or_create(
hostname=hostname,
defaults={'last_heartbeat': heartbeat},
)
return obj
[docs]class TaskStateQuerySet(ExtendedQuerySet):
"""A custom model queryset for the TaskState model with some helpers."""
[docs] def expired(self, states, expires):
"""Return all expired task states."""
return self.filter(
state__in=states,
tstamp__lte=Now() - maybe_timedelta(expires),
)
[docs] def expire_by_states(self, states, expires):
"""Expire task with one of the given states."""
if expires is not None:
return self.expired(states, expires).update(hidden=True)
[docs] def purge(self):
"""Purge all expired task states."""
with transaction.atomic():
self.using(
router.db_for_write(self.model)
).filter(hidden=True).delete()
def update_state(self, state, task_id, defaults):
with transaction.atomic():
obj, created = self.select_for_update_or_create(
task_id=task_id,
defaults=defaults,
)
if created:
return obj
if states.state(state) < states.state(obj.state):
keep = Task.merge_rules[states.RECEIVED]
else:
keep = {}
for key, value in defaults.items():
if key not in keep:
setattr(obj, key, value)
obj.save(update_fields=tuple(defaults.keys()))
return obj