Pytanie w sprawie celery, python, celerybeat – Jak dynamicznie dodawać / usuwać okresowe zadania do selera (selerbeat)

41

Jeśli mam funkcję zdefiniowaną w następujący sposób:

<code>def add(x,y):
  return x+y
</code>

Czy istnieje sposób dynamicznego dodawania tej funkcji jako selera PeriodicTask i uruchamiania go w czasie wykonywania? Chciałbym móc zrobić coś takiego (pseudokod):

<code>some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
</code>

Chciałbym również zatrzymać lub usunąć to zadanie dynamicznie za pomocą czegoś takiego (pseudokod):

<code>celery.beat.remove_task(some_unique_task_id)
</code>

lub

<code>celery.beat.stop(some_unique_task_id)
</code>

FYI Nie używam djcelery, co pozwala zarządzać okresowymi zadaniami za pośrednictwem administratora django.

Twoja odpowiedź

4   odpowiedź
36

Na to pytanie padło pytaniegrupy google.

NIE JESTEM AUTOREM, wszystkie zasługi należą się Jeanowi Markowi

Oto właściwe rozwiązanie tego problemu. Potwierdzone działanie, W moim scenariuszu podklasowałem zadanie okresowe i wykreowałem z niego model, ponieważ mogę dodawać inne pola do modelu, jak potrzebuję, a także dodawać metodę „zakończ”. Musisz ustawić właściwość włączoną do okresowego zadania na False i zapisać ją przed usunięciem. Cała podklasa nie jest koniecznością, a metoda schedule_every naprawdę działa. Kiedy jesteś gotowy, aby zakończyć zadanie (jeśli nie podklasowałeś go), możesz użyć PeriodicTask.objects.filter (name = ...), aby wyszukać swoje zadanie, wyłączyć je, a następnie usunąć.

Mam nadzieję że to pomoże!

<code>from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
</code>
To powinna być zaakceptowana odpowiedź. tomorrow__
@kaiIntervalSchedule, PeriodicTaskitd. sądjcelery klasy, a OP mówi, że nie używadjcelery. Na pewno przydatne. Chris
4

Istnieje biblioteka o nazwie django-celery-beat, która dostarcza potrzebnych modeli. Aby dynamicznie ładować nowe zadania okresowe, należy utworzyć własny harmonogram.

<code>from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
</code>
Dzięki. Nie działa od razu, ale używato_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()] i podobne doto_remove zrobiłem sztuczkę. Dlaczego nie jest to standardowa opcja? Do tej pory musiałem odliczać zadania Celery do innych zadań selera. To nie brzmi zbyt dobrze dla mnie. freethebees
2

Możesz to sprawdzićflask-djcelery który konfiguruje kolbę i djcelery, a także zapewnia przeglądalny api odpoczynku

19

Nie, przepraszam, nie jest to możliwe w przypadku zwykłego selera.

Ale jest łatwo rozszerzalny, aby robić to, co chcesz, np. program planujący django-celery jest tylko podklasą odczytującą i zapisującą harmonogram do bazy danych (z kilkoma optymalizacjami na górze).

Możesz także użyć harmonogramu django-celery nawet dla projektów innych niż Django.

Coś takiego:

Zainstaluj django + django-celery:

$ pip zainstaluj -U django django-celery

Dodaj następujące ustawienia do celeryconfig:

<code>DATABASES = {
    'default': {
        'NAME': 'celerybeat.db',
        'ENGINE': 'django.db.backends.sqlite3',
    },
}
INSTALLED_APPS = ('djcelery', )
</code>

Utwórz tabele bazy danych:

<code>$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
</code>

Uruchom celerybeat za pomocą harmonogramu bazy danych:

<code>$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
    -S djcelery.schedulers.DatabaseScheduler
</code>

Jest teżdjcelerymon polecenie, które może być użyte do projektów innych niż Django, aby uruchomić celerycam i serwer WWW Django Admin w tym samym procesie, możesz użyć tego do edycji swoich okresowych zadań w ładnym interfejsie internetowym:

<code>   $ djcelerymon
</code>

(Z jakiegoś powodu djcelerymon nie może zostać zatrzymany za pomocą Ctrl + C, musisz użyć Ctrl + Z + zabić% 1)

Czy możesz podać kod, aby dodać zadanie i usunąć? Przepraszam, że nie dostaję. Ansuman Bebarta
Wszelkie zmiany w tym okresie od 2012 do 2016? Tanay

Powiązane pytania