Frage an python, celery, celerybeat – Dynamisches Hinzufügen / Entfernen von periodischen Aufgaben zu Sellerie (Sellerie)

41

Wenn ich eine Funktion wie folgt definiert habe:

<code>def add(x,y):
  return x+y
</code>

Gibt es eine Möglichkeit, diese Funktion dynamisch als Sellerie-PeriodicTask hinzuzufügen und zur Laufzeit zu starten? Ich möchte in der Lage sein, etwas wie (Pseudocode) zu tun:

<code>some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
</code>

Ich möchte diese Aufgabe auch mit so etwas wie (Pseudocode) stoppen oder dynamisch entfernen:

<code>celery.beat.remove_task(some_unique_task_id)
</code>

oder

<code>celery.beat.stop(some_unique_task_id)
</code>

Zu Ihrer Information, ich verwende kein djcelery, mit dem Sie regelmäßige Aufgaben über den django-Administrator verwalten können.

Deine Antwort

4   die antwort
36

Diese Frage wurde am beantwortetGoogle Groups.

Ich bin nicht der Autor, alle Ehre gebührt Jean Mark

Hier ist eine geeignete Lösung dafür. Bestätigtes Arbeiten In meinem Szenario habe ich die periodische Aufgabe in Unterklassen unterteilt und ein Modell daraus erstellt, da ich dem Modell nach Bedarf weitere Felder hinzufügen und auch die Methode "Beenden" hinzufügen kann. Sie müssen die Eigenschaft enabled der periodischen Aufgabe auf False setzen und speichern, bevor Sie sie löschen. Die gesamte Unterklasse ist kein Muss, die Methode schedule_every erledigt die eigentliche Arbeit. Wenn Sie bereit sind, Ihre Aufgabe zu beenden (wenn Sie sie nicht untergeordnet haben), können Sie sie einfach mit PeriodicTask.objects.filter (name = ...) suchen, deaktivieren und dann löschen.

Hoffe das hilft!

<code>from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime

class TaskScheduler(models.Model):

    periodic_task = models.ForeignKey(PeriodicTask)

    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)

    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()

    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()

    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()
</code>
Dies sollte die akzeptierte Antwort sein. tomorrow__
@kaiIntervalSchedule, PeriodicTaskusw. sinddjcelery Klassen, und das OP sagt, dass er nicht verwendetdjcelery. Auf jeden Fall nützlich. Chris
2

Sie können dies überprüfenFlaschenschmuck Das konfiguriert flask und djcelery und bietet auch eine durchsuchbare Rest-API

19

Nein, tut mir leid, das ist mit dem normalen Sellerie nicht möglich.

Es ist jedoch leicht erweiterbar, um das zu tun, was Sie möchten, z. Der Django-Sellerie-Scheduler ist nur eine Unterklasse, die den Zeitplan liest und in die Datenbank schreibt (mit einigen Optimierungen oben).

Sie können den Django-Sellerie-Scheduler auch für Nicht-Django-Projekte verwenden.

Etwas wie das:

Installieren Sie Django + Django-Sellerie:

$ pip install -U Django Django-Sellerie

Fügen Sie Ihrer Selleriekonfiguration die folgenden Einstellungen hinzu:

<code>DATABASES = {
    'default': {
        'NAME': 'celerybeat.db',
        'ENGINE': 'django.db.backends.sqlite3',
    },
}
INSTALLED_APPS = ('djcelery', )
</code>

Erstellen Sie die Datenbanktabellen:

<code>$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
</code>

Starten Sie Sellerie mit dem Datenbank-Scheduler:

<code>$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
    -S djcelery.schedulers.DatabaseScheduler
</code>

Es gibt auch diedjcelerymon Befehl, der für Nicht-Django-Projekte verwendet werden kann, um celerycam und einen Django Admin-Webserver in demselben Prozess zu starten. Mit diesem Befehl können Sie auch Ihre regelmäßigen Aufgaben in einer netten Weboberfläche bearbeiten:

<code>   $ djcelerymon
</code>

(Aus irgendeinem Grund kann djcelerymon nicht mit Strg + C gestoppt werden. Sie müssen Strg + Z + kill% 1 verwenden.)

Änderungen in diesem Zeitraum von 2012 bis 2016? Tanay
Können Sie bitte den Code für das Hinzufügen und Entfernen von Aufgaben erwähnen? Entschuldigung, ich verstehe nicht. Ansuman Bebarta
4

die die Modelle bietet, die man braucht. Damit neue periodische Aufgaben dynamisch geladen werden, muss ein eigener Scheduler erstellt werden.

<code>from django_celery_beat.schedulers import DatabaseScheduler


class AutoUpdateScheduler(DatabaseScheduler):

    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()

            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]

        super(AutoUpdateScheduler, self).tick(*args, **kwargs)

    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()

        return self._schedule
</code>
Vielen Dank. Hat nicht sofort funktioniert, sondern mitto_add = [key for key in new_schedule.keys() if key not in self.schedule.keys()] und ähnliches fürto_remove hat den Trick gemacht. Warum ist dies keine Standardoption? Bisher mussten Sellerie-Aufgaben andere Sellerie-Aufgaben mit einem Countdown aufrufen. Das klingt für mich nicht sehr gut. freethebees

Verwandte Fragen