Pregunta sobre celerybeat, celery, python – Cómo agregar / eliminar tareas periódicas dinámicamente al apio (celerybeat)

41

Si tengo una función definida de la siguiente manera:

<code>def add(x,y):
  return x+y
</code>

¿Hay alguna forma de agregar dinámicamente esta función como una Tarea periódica de apio y ponerla en marcha en tiempo de ejecución? Me gustaría poder hacer algo como (pseudocódigo):

<code>some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
</code>

También me gustaría detener o eliminar esa tarea dinámicamente con algo como (pseudocódigo):

<code>celery.beat.remove_task(some_unique_task_id)
</code>

o

<code>celery.beat.stop(some_unique_task_id)
</code>

Para tu información, no estoy usando djcelery, lo que te permite administrar tareas periódicas a través del administrador de django.

Tu respuesta

4   la respuesta
19

No, lo siento, esto no es posible con el apio regular.

Pero es fácilmente extensible para hacer lo que quieres, por ejemplo. El programador django-celery es solo una subclase que lee y escribe el programa en la base de datos (con algunas optimizaciones en la parte superior).

También puede usar el programador django-celery incluso para proyectos que no sean de Django.

Algo como esto:

Instalar django + django-celery:

$ pip install -U django django-apio

Agregue los siguientes ajustes a su apio de apio:

<code>DATABASES = {
    'default': {
        'NAME': 'celerybeat.db',
        'ENGINE': 'django.db.backends.sqlite3',
    },
}
INSTALLED_APPS = ('djcelery', )
</code>

Crear las tablas de la base de datos:

<code>$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
</code>

Inicie celerybeat con el programador de la base de datos:

<code>$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
    -S djcelery.schedulers.DatabaseScheduler
</code>

También está eldjcelerymon comando que se puede usar para proyectos que no sean de Django para iniciar celerycam y un servidor web de Django Admin en el mismo proceso, puede usarlo para editar también sus tareas periódicas en una interfaz web agradable:

<code>   $ djcelerymon
</code>

(Tenga en cuenta que por alguna razón no se puede detener djcelerymon usando Ctrl + C, tiene que usar Ctrl + Z + kill% 1)

¿Puede por favor mencionar el código para agregar tarea y eliminar? Lo siento, no estoy recibiendo. Ansuman Bebarta
¿Algún cambio en esto de 2012 a 2016? Tanay
36

Esta pregunta fue respondida engrupos de Google.

NO SOY EL AUTOR, todo el crédito va para Jean Mark

Aquí hay una solución adecuada para esto. Trabajo confirmado, en mi caso, clasifiqué la tarea periódica y creé un modelo a partir de ella, ya que puedo agregar otros campos al modelo según sea necesario y también para poder agregar el método "terminar". Debe establecer la propiedad habilitada de la tarea periódica en Falso y guardarla antes de eliminarla. La subclase completa no es una obligación, el método schedule_every es el que realmente hace el trabajo. Cuando esté listo para terminar su tarea (si no la subclasificó), puede usar PeriodicTask.objects.filter (name = ...) para buscar su tarea, deshabilitarla y luego eliminarla.

¡Espero que esto ayude!

<code>from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
</code>
Esta debería ser la respuesta aceptada. tomorrow__
@kaiIntervalSchedule, PeriodicTask, etc, sondjcelery clases, y el OP dice que no está usandodjcelery. Definitivamente útil, sin embargo. Chris
2

frasco-djcelery que configura matraz y djcelery y también proporciona api de descanso navegable

4

Hay una biblioteca llamada django-celery-beat que proporciona los modelos que uno necesita. Para hacerlo cargar dinámicamente nuevas tareas periódicas, uno tiene que crear su propio Programador.

<code>from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
</code>
Gracias. No funcionó de inmediato pero usandoto_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()] y similar parato_remove Hizo el truco. ¿Por qué no es esta una opción estándar? Hasta ahora, he tenido que hacer que las tareas de apio llamen a otras tareas de apio con una cuenta regresiva. Eso no me suena muy bien. freethebees

Preguntas relacionadas