Вопрос по – Django & Celery - проблемы с маршрутизацией

6

Я использую Django и Celery и пытаюсь настроить маршрутизацию к нескольким очередям. Когда я указываю задачуrouting_key а такжеexchange (либо в декораторе задачи, либо используяapply_async()), задача не добавляется в брокер (это Kombu, подключающийся к моей базе данных MySQL).

Если я укажу имя очереди в декораторе задачи (что будет означать, что ключ маршрутизации игнорируется), задача работает нормально. Кажется, это проблема с настройкой маршрутизации / обмена.

Есть идеи, в чем проблема?

Вот настройки:

Settings.py

INSTALLED_APPS = (
    ...
    'kombu.transport.django',
    'djcelery',
)
BROKER_BACKEND = 'django'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_QUEUES = {
    'default': {
        'binding_key':'task.#',
    },
    'i_tasks': {
        'binding_key':'important_task.#',
    },
}

Tasks.py

from celery.task import task

@task(routing_key='important_task.update')
def my_important_task():
    try:
        ...
    except Exception as exc:
        my_important_task.retry(exc=exc)

Начальная задача:

from tasks import my_important_task
my_important_task.delay()
delay не принимает ключевое слово routing_key. Это упрощенная версия apply_async, но они не совпадают. mher
Я используюdelay() метод, который является просто ярлыком дляapply_async(). Я пытаюсь сохранитьrouting_key спецификация с методом задачи (через декоратор) вместо того, когда он вызывается. Я попытался передать ключ с помощьюapply_async() но у меня такая же проблема. Michael Waterfall
Я правильно понял, что могу просто указать информацию о маршрутизации / обмене в декораторе задач, и это следует учитывать при вызове? Michael Waterfall
Как вы передаете routing_key? С помощью async_apply? mher
Я не передаю информацию о маршрутизации при запуске задачи, я указываю ее в декораторе задачи вместе с определением метода. Пожалуйста, посмотрите код выше, чтобы увидеть мои настройки. Michael Waterfall

Ваш Ответ

1   ответ
43

что означает декларация только хранится в памяти (см, бесспорно, трудно найти, транспорт сравнительной таблицы наhttp: //readthedocs.org/docs/kombu/en/latest/introduction.html#transport-compariso)

Так что, когда вы применяете эту задачу с routing_keyimportant_task.update он не сможет направить его, потому что он еще не объявил очередь.

Это сработает, если ты это сделаешь:

@task(queue="i_tasks", routing_key="important_tasks.update")
def important_task():
    print("IMPORTANT")

Но вам было бы намного проще использовать функцию автоматической маршрутизации, поскольку здесь нет ничего, что указывало бы на необходимость использования «тематического» обмена, чтобы использовать автоматическую маршрутизацию просто удалить настройки:

CELERY_DEFAULT_QUEUE,CELERY_DEFAULT_EXCHANGE,CELERY_DEFAULT_EXCHANGE_TYPECELERY_DEFAULT_ROUTING_KEYCELERY_QUEUES

И объявите свою задачу следующим образом:

@task(queue="important")
def important_task():
    return "IMPORTANT"

и затем запустить работника, потребляющего из этой очереди:

$ python manage.py celeryd -l info -Q important

или использовать как по умолчанию celery) очередь иimportant очередь

$ python manage.py celeryd -l info -Q celery,important

Еще одна хорошая практика - не жестко задавать имена очередей в задачу и использоватьCELERY_ROUTES вместо:

@task
def important_task():
    return "DEFAULT"

затем в ваших настройках:

CELERY_ROUTES = {"myapp.tasks.important_task": {"queue": "important"}}

Если вы по-прежнему настаиваете на использовании обмена темами, вы можете добавить этот маршрутизатор для автоматического объявления всех очередей при первой отправке задачи:

class PredeclareRouter(object):
    setup = False

    def route_for_task(self, *args, **kwargs):
        if self.setup:
            return
        self.setup = True
        from celery import current_app, VERSION as celery_version
        # will not connect anywhere when using the Django transport
        # because declarations happen in memory.
        with current_app.broker_connection() as conn:
            queues = current_app.amqp.queues
            channel = conn.default_channel
            if celery_version >= (2, 6):
                for queue in queues.itervalues():
                    queue(channel).declare()
    ,        else:
                from kombu.common import entry_to_queue
                for name, opts in queues.iteritems():
                    entry_to_queue(name, **opts)(channel).declare()
CELERY_ROUTES = (PredeclareRouter(), )
Спасибо за объяснение Michael Waterfall
Эта проблема с объявлениями очереди и обменами решена в Celery 3? Я использую новыйCELERY_QUEUES = (Queue(...), ...) в настройках это означает, что очереди объявляются правильно? Michael Waterfall
Примечание: в Celery 4.0 и выше CELERY_ROUTES был заменен на CELERY_TASK_ROUTES. Может сэкономить чье-то время. Vernon Gutierrez

Похожие вопросы