Вопрос по ctypes, multiprocessing, python – Python: многопроцессорность и массив c_char_p
m запускает 3 процесса, и я хочу, чтобы они поместили строку в общий массив,по индексу, соответствующему процессу (я).
Посмотрите на код ниже, сгенерированный вывод:
['test 0', None, None]
['test 1', 'test 1', None]
['test 2', 'test 2', 'test 2']
Зачем 'тест 0 ' перезаписатьtest 1
, а такжеtest 1
от ?test 2
То, что я хочу (порядок не важен):
['test 0', None, None]
['test 0', 'test 1', None]
['test 0', 'test 1', 'test 2']
Код: я
#!/usr/bin/env python
import multiprocessing
from multiprocessing import Value, Lock, Process, Array
import ctypes
from ctypes import c_int, c_char_p
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue, arr, lock):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.arr = arr
self.lock = lock
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(arr=self.arr, lock=self.lock)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, arr=None, lock=None):
with lock:
arr[self.i] = "test %d" % self.i
print arr[:]
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
arr = Array(ctypes.c_char_p, 3)
lock = multiprocessing.Lock()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
работает на Python 2.7.3 (Ubuntu)
Эта проблема кажется похожей наэтот, Там, Дж. Ф. Себастьян предположил, что назначениеarr[i]
точкиarr[i]
на адрес памяти, который имел значение только для подпроцесса, выполняющего присваивание. Другие подпроцессы извлекают мусор, просматривая этот адрес.
Есть как минимум два способа избежать этой проблемы. Одним из них является использованиеmultiprocessing.manager
список:
import multiprocessing as mp
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue, lock, lst):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.lock = lock
self.lst = lst
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(lock = self.lock, lst = self.lst)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, lock, lst):
with lock:
lst[self.i] = "test {}".format(self.i)
print([lst[i] for i in range(3)])
if __name__ == '__main__':
tasks = mp.JoinableQueue()
results = mp.Queue()
manager = mp.Manager()
lst = manager.list(['']*3)
lock = mp.Lock()
num_consumers = mp.cpu_count() * 2
consumers = [Consumer(tasks, results, lock, lst) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
tasks.join()
Другой способ - использовать общий массив с фиксированным размером, например.mp.Array('c', 10)
import multiprocessing as mp
class Consumer(mp.Process):
def __init__(self, task_queue, result_queue, arr, lock):
mp.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.arr = arr
self.lock = lock
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
self.task_queue.task_done()
break
answer = next_task(arr = self.arr, lock = self.lock)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, i):
self.i = i
def __call__(self, arr, lock):
with lock:
arr[self.i].value = "test {}".format(self.i)
print([a.value for a in arr])
if __name__ == '__main__':
tasks = mp.JoinableQueue()
results = mp.Queue()
arr = [mp.Array('c', 10) for i in range(3)]
lock = mp.Lock()
num_consumers = mp.cpu_count() * 2
consumers = [Consumer(tasks, results, arr, lock) for i in xrange(num_consumers)]
for w in consumers:
w.start()
for i in xrange(3):
tasks.put(Task(i))
for i in xrange(num_consumers):
tasks.put(None)
tasks.join()
Я предполагаю, что причина, почему это работает, когдаmp.Array(ctypes.c_char_p, 3)
не потому чтоmp.Array('c', 10)
имеет фиксированный размер, поэтому адрес памяти никогда не меняется, в то время какmp.Array(ctypes.c_char_p, 3)
имеет переменный размер, поэтому адрес памяти может измениться, когдаarr[i]
назначается на большую строку.
Возможно это то, чтодокументы предупреждают о том, когда говорится,
Хотя можно хранить указатель в разделяемой памяти, помните, что это будет указывать на местоположение в адресном пространстве определенного процесса. Однако вполне вероятно, что указатель недопустим в контексте второго процесса, и попытка разыменования указателя из второго процесса может вызвать сбой.