Вопрос по queue, multithreading, multiprocessing, python – Максимальный размер для многопроцессорной обработки. Элемент очереди?

15

Я работаю над довольно крупным проектом на Python, который требует, чтобы одна из ресурсоемких фоновых задач была выгружена в другое ядро, чтобы основной сервис не замедлялся. Я столкнулся с каким-то странным поведением при использованииmultiprocessing.Queue сообщить результаты рабочего процесса. Использование одной и той же очереди дляthreading.Thread иmultiprocessing.Process для сравнения поток работает просто отлично, но процессу не удается присоединиться после помещения большого элемента в очередь. Заметим:

<code>import threading
import multiprocessing

class WorkerThread(threading.Thread):
    def __init__(self, queue, size):
        threading.Thread.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


class WorkerProcess(multiprocessing.Process):
    def __init__(self, queue, size):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.size = size

    def run(self):
        self.queue.put(range(size))


if __name__ == "__main__":
    size = 100000
    queue = multiprocessing.Queue()

    worker_t = WorkerThread(queue, size)
    worker_p = WorkerProcess(queue, size)

    worker_t.start()
    worker_t.join()
    print 'thread results length:', len(queue.get())

    worker_p.start()
    worker_p.join()
    print 'process results length:', len(queue.get())
</code>

Я видел, что это прекрасно работает дляsize = 10000, но висит наworker_p.join() заsize = 100000, Есть ли какое-то ограничение на размерmultiprocessing.Process экземпляры могут положить вmultiprocessing.Queue? Или я делаю здесь очевидную фундаментальную ошибку?

Для справки, я использую Python 2.6.5 в Ubuntu 10.04.

Ваш Ответ

3   ответа
2

По умолчанию maxsize of Queue бесконечен, но вы переопределили это. В вашем случае worker_p помещает элемент в очередь, очередь должна быть освобождена перед вызовом join. Пожалуйста, обратитесь к ссылке ниже для деталей. https://docs.python.org/2/library/multiprocessing.html#programming-guidelines

1

Ответ намногопроцессорная обработка на Python: некоторые функции не возвращаются после завершения (материал очереди слишком большой) реализует то, что вы, вероятно, подразумеваете под «снятием очереди»; до присоединения & quot; в параллельном выполнении произвольного набора функций, чьи возвращаемые значения ставятся в очередь.

Таким образом, это позволяет помещать вещи любого размера в очереди, так что найденное вами ограничение не мешает.

18

Кажется, что нижележащий канал заполнен, поэтому поток фидера блокирует запись в канал (фактически при попытке получить блокировку, защищающую канал от одновременного доступа).

Проверьте эту проблемуhttp://bugs.python.org/issue8237

Спасибо огромное. просто поменяйте местами 2 строки: & quot; worker_t.join () напечатать "длина результатов потока:", len (queue.get ()) & quot;
Спасибо, это именно та проблема, с которой я сталкиваюсь, и удаление из нее в родительском потоке перед присоединением, кажется, работает нормально. Brendan Wood

Похожие вопросы