Вопрос по python – Как динамически добавлять / удалять периодические задачи в Celery (celerybeat)

41

Если у меня есть функция, определенная следующим образом:

<code>def add(x,y):
  return x+y
</code>

Есть ли способ динамически добавить эту функцию в качестве периодической задачи сельдерея и запустить ее во время выполнения? Я хотел бы иметь возможность делать что-то вроде (псевдокод):

<code>some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
</code>

Я также хотел бы динамически остановить или удалить эту задачу с помощью чего-то вроде (псевдокод):

<code>celery.beat.remove_task(some_unique_task_id)
</code>

или же

<code>celery.beat.stop(some_unique_task_id)
</code>

К вашему сведению, я не использую djcelery, который позволяет вам управлять периодическими задачами через администратора django.

Ваш Ответ

4   ответа
2

Колба-djcelery который настраивает флешку и djcelery, а также предоставляет доступный API для просмотра

4

которая предоставляет модели, которые вам нужны. Чтобы динамически загружать новые периодические задачи, нужно создать собственный планировщик.

from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
Error: User Rate Limit Exceededto_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()]Error: User Rate Limit Exceededto_removeError: User Rate Limit Exceeded
36

группы Google.

Я НЕ АВТОР, все заслуги перед Джин Марком

Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.

Hope this helps!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
Error: User Rate Limit Exceeded
Error: User Rate Limit ExceededIntervalSchedule, PeriodicTaskError: User Rate Limit ExceededdjceleryError: User Rate Limit ExceededdjceleryError: User Rate Limit Exceeded
19

Но это легко расширяется, чтобы делать то, что вы хотите, например, Джанго-сельдерей планировщик - это просто чтение подкласса и запись расписания в базу данных (с некоторыми оптимизациями сверху).

Также вы можете использовать планировщик django-celery даже для не-Django проектов.

Что-то вроде этого:

Install django + django-celery:

$ pip install -U django django-celery

Add the following settings to your celeryconfig:

DATABASES = {
    'default': {
        'NAME': 'celerybeat.db',
        'ENGINE': 'django.db.backends.sqlite3',
    },
}
INSTALLED_APPS = ('djcelery', )

Create the database tables:

$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig

Start celerybeat with the database scheduler:

$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
    -S djcelery.schedulers.DatabaseScheduler

Также естьdjcelerymon команда, которая может быть использована для не-Django проектов чтобы запустить celerycam и веб-сервер Django Admin в одном процессе, вы можете используйте это, чтобы также редактировать ваши периодические задачи в хорошем веб-интерфейсе:

   $ djcelerymon

(Обратите внимание, что по какой-то причине djcelerymon не может быть остановлен с помощью Ctrl + C, вы должен использовать Ctrl + Z + kill% 1)

Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded

Похожие вопросы