Вопрос по python – Как я могу восстановить возвращаемое значение функции, переданной multiprocessing.Process?

112

В приведенном ниже примере кода я хочу восстановить возвращаемое значение функцииworker, Как я могу сделать это? Где хранится это значение?

Example Code:

<code>import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs
</code>

Output:

<code>0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]
</code>

Я не могу найти соответствующий атрибут в объектах, хранящихся вjobs.

Заранее спасибо, БИК

Ваш Ответ

9   ответов
10

Кажется, вы должны использоватьmultiprocessing.Pool вместо этого используйте классы .apply () .apply_async (), map ()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

46

что подход, предложенный @sega_sai, лучше. Но это действительно нуждается в примере кода, так что здесь идет:

import multiprocessing
from os import getpid

def worker(procnum):
    print 'I am number %d in process %d' % (procnum, getpid())
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print pool.map(worker, range(5))

Который будет печатать возвращаемые значения:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Если вы знакомы сmap (встроенный в Python 2) это не должно быть слишком сложным. В противном случае взгляните нассылка sega_Sai.

Обратите внимание, как мало кода требуется. (Также обратите внимание, как процессы используются повторно).

Любые идеи, почему мойgetpid() вернуть все то же значение? Я использую Python3
Я не уверен, как Pool распределяет задачи по работникам. Может быть, все они могут оказаться на одном и том же работнике, если они действительно быстрые? Это происходит последовательно? Также, если вы добавите задержку?
Здесь & APOS; smy question
Тогда я не уверен. Я думаю, что было бы интересно открыть для этого отдельный вопрос.
Я также думал, что это было связано со скоростью, но когда я кормлюpool.map диапазон 1000000 с использованием более 10 процессов, я вижу не более двух разных пидов.
9

как это сделать сQueue в любом месте (даже примеры документов Python не порождают несколько процессов), так что вот что я получил после 10 попыток:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue это блокирующая потокобезопасная очередь, которую вы можете использовать для хранения возвращаемых значений от дочерних процессов. Таким образом, вы должны передать очередь каждому процессу. Что-то менее очевидное в том, что вы должныget() из очереди перед вамиjoin Processили очередь заполняется и блокирует все.

Update для тех, кто является объектно-ориентированным (протестировано в Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)
9

Вы можете использоватьexit встроенный, чтобы установить код выхода процесса. Его можно получить изexitcode атрибут процесса:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
Идеально, если вы просто хотите вызвать исключение в родительском процессе при ошибке.
Имейте в виду, что такой подход может привести к путанице. Обычно процессы должны завершаться с кодом завершения 0, если они завершены без ошибок. Если у вас есть что-нибудь, отслеживающее коды завершения процесса вашей системы, вы можете увидеть эти сообщения как ошибки.
7

Для тех, кто ищет, как получить ценность отProcess с помощьюQueue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()
@ LaurensKoppenol Вы, возможно, не звонитеqueue.put(ret) до звонкаp.start() ? В этом случае рабочий поток будет висеть наqueue.get() навсегда. Вы можете повторить это, скопировав мой фрагмент выше при комментированииqueue.put(ret).
Да, он висит там бесконечно. Все мои работники заканчивают (цикл внутри рабочей функции завершается, после этого печатается оператор print для всех работников). Объединение ничего не делает. Если я удалюQueue от моей функции это позволяет мне пройтиjoin()
Я отредактировал этот ответ,queue.get() должно произойти доp.join(), Это работает сейчас для меня.
@LaurensKoppenol Вы имеете в виду, что ваш основной код постоянно висит в p.join () и никогда не продолжается? Ваш процесс имеет бесконечный цикл?
когда я помещаю что-то в очередь в моем рабочем процессе, мое соединение никогда не достигается. Любая идея, как это может прийти?
0

Простое решение:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Выход:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0

так как мне нужно было получить коды ошибок из функции. (Спасибо Vertec !!! это удивительный трюк)

Это также можно сделать с помощьюmanager.list но я думаю, что лучше иметь это в диктовке и хранить список в нем. Таким образом, мы сохраняем функцию и результаты, поскольку не можем быть уверены в том, в каком порядке будет заполняться список.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j
15

Этот пример показывает, как использовать списокmultiprocessing.Pipe экземпляры для возврата строк из произвольного числа процессов:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

Это решение использует меньше ресурсов, чемmultiprocessing.Queue который использует

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

илиmultiprocessing.SimpleQueue который использует

  • a Pipe
  • at least one Lock

Очень поучительно посмотреть на источник для каждого из этих типов.

всегда ли должен читаться канал, прежде чем к нему можно будет добавить (отправить) новое значение?
Что было бы лучшим способом сделать это без превращения каналов в глобальную переменную?
Я помещаю все глобальные данные и код в основную функцию, и она работает так же. Это отвечает на ваш вопрос?
+1, хороший ответ. Но из-за того, что решение более эффективно, компромисс заключается в том, что вы делаетеPipe за процесс против одногоQueue for all processes. I don't know if that ends up being more efficient in all cases. – sudo Sep 21 '17 at 20:41
103

использованиеобщая переменная общаться. Например, вот так:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()
Я бы порекомендовал использоватьmultiprocessing.Queue, а неManager Вот. ИспользуяManager требует порождения совершенно нового процесса, который является излишним, когдаQueue сделал бы.
это прекрасно работает для одновременного запуска нескольких функций и хитрости для получения информации после. знак равно
@dano: Интересно, если мы используем объект Queue (), мы не можем определить порядок, когда каждый процесс возвращает значение. Я имею в виду, если нам нужен порядок в результате, чтобы сделать следующую работу. Как мы можем быть уверены, где именно, какой вывод от какого процесса
@Catbuilts Вы можете возвратить кортеж из каждого процесса, где одно значение является фактическим возвращаемым значением, о котором вы заботитесь, а другое - уникальным идентификатором из процесса. Но мне также интересно, почему вы должны знать, какой процесс возвращает какое значение. Если это то, что вам на самом деле нужно знать о процессе, или вам нужно соотнести ваш список входов и список выходов? В этом случае я бы рекомендовал использоватьmultiprocessing.Pool.map обработать ваш список рабочих элементов.

Похожие вопросы