Вопрос по ctypes, multiprocessing, python – Python: многопроцессорность и массив c_char_p

5

m запускает 3 процесса, и я хочу, чтобы они поместили строку в общий массив,по индексу, соответствующему процессу (я).

Посмотрите на код ниже, сгенерированный вывод:

['test 0', None, None]
['test 1', 'test 1', None]
['test 2', 'test 2', 'test 2']

Зачем 'тест 0 ' перезаписатьtest 1, а такжеtest 1 от ?test 2

То, что я хочу (порядок не важен):

['test 0', None, None]
['test 0', 'test 1', None]
['test 0', 'test 1', 'test 2']

Код: я

#!/usr/bin/env python

import multiprocessing
from multiprocessing import Value, Lock, Process, Array
import ctypes
from ctypes import c_int, c_char_p

class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr=self.arr, lock=self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr=None, lock=None):
        with lock:
            arr[self.i] = "test %d" % self.i
            print arr[:]

    def __str__(self):
        return 'ARC'

    def run(self):
        print 'IN'

if __name__ == '__main__':
   tasks = multiprocessing.JoinableQueue()
   results = multiprocessing.Queue()

   arr = Array(ctypes.c_char_p, 3)

   lock = multiprocessing.Lock()

   num_consumers = multiprocessing.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

работает на Python 2.7.3 (Ubuntu)

Ваш Ответ

1   ответ
5

Эта проблема кажется похожей наэтот, Там, Дж. Ф. Себастьян предположил, что назначениеarr[i] точкиarr[i] на адрес памяти, который имел значение только для подпроцесса, выполняющего присваивание. Другие подпроцессы извлекают мусор, просматривая этот адрес.

Есть как минимум два способа избежать этой проблемы. Одним из них является использованиеmultiprocessing.manager список:

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, lock, lst):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.lock = lock
            self.lst = lst

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(lock = self.lock, lst = self.lst)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, lock, lst):
        with lock:
            lst[self.i] = "test {}".format(self.i)
            print([lst[i] for i in range(3)])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   manager = mp.Manager()
   lst = manager.list(['']*3)

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

Другой способ - использовать общий массив с фиксированным размером, например.mp.Array('c', 10)

import multiprocessing as mp

class Consumer(mp.Process):
    def __init__(self, task_queue, result_queue, arr, lock):
            mp.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
            self.arr = arr
            self.lock = lock

    def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    self.task_queue.task_done()
                    break            
                answer = next_task(arr = self.arr, lock = self.lock)
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return

class Task(object):
    def __init__(self, i):
        self.i = i

    def __call__(self, arr, lock):
        with lock:
            arr[self.i].value = "test {}".format(self.i)
            print([a.value for a in arr])

if __name__ == '__main__':
   tasks = mp.JoinableQueue()
   results = mp.Queue()
   arr = [mp.Array('c', 10) for i in range(3)]

   lock = mp.Lock()
   num_consumers = mp.cpu_count() * 2
   consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]

   for w in consumers:
      w.start()

   for i in xrange(3):
      tasks.put(Task(i))

   for i in xrange(num_consumers):
      tasks.put(None)

   tasks.join()

Я предполагаю, что причина, почему это работает, когдаmp.Array(ctypes.c_char_p, 3) не потому чтоmp.Array('c', 10) имеет фиксированный размер, поэтому адрес памяти никогда не меняется, в то время какmp.Array(ctypes.c_char_p, 3) имеет переменный размер, поэтому адрес памяти может измениться, когдаarr[i] назначается на большую строку.

Возможно это то, чтодокументы предупреждают о том, когда говорится,

Хотя можно хранить указатель в разделяемой памяти, помните, что это будет указывать на местоположение в адресном пространстве определенного процесса. Однако вполне вероятно, что указатель недопустим в контексте второго процесса, и попытка разыменования указателя из второго процесса может вызвать сбой.

Буду делать, как только у меня будет 15 репутации, я выиграл 'забыть;) Ujoux
Спасибо миллиард раз! Оба ваших решения действительно работают :) Я наткнулся на этот пост Ж. Ф. Себастьяна, но по какой-то причине не смогреализовать это ... дох! Теперь скажи мне, где я должен построить твою статую! Еще раз спасибо ... Ujoux
@unutbu - Одна статуя от меня тоже;) +1 Спасибо, хороший ответ. trex
Спасибо за интересный вопрос и ваш энтузиазм! Надеюсь увидеть вас больше на Stackoverflow. Что касается статуй - я думаю, что нажатие на стрелку вверх над галочкой делает довольно крутой; ^) unutbu

Похожие вопросы