Вопрос по python, numpy – Объекты с общей памятью в многопроцессорной среде

90

Предположим, у меня есть большой массив в памяти, у меня есть функцияfunc который принимает этот гигантский массив в качестве входных данных (вместе с некоторыми другими параметрами).func с разными параметрами можно запускать параллельно. Например:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

Если я использую многопроцессорную библиотеку, то этот гигантский массив будет многократно скопирован в разные процессы.

Есть ли способ позволить различным процессам использовать один и тот же массив? Этот объект массива доступен только для чтения и никогда не будет изменен.

Что сложнее, если arr не массив, а произвольный объект python, есть ли способ поделиться им?

[EDITED]

Я прочитал ответ, но я все еще немного сбит с толку. Поскольку fork () является копированием при записи, мы не должны вызывать никаких дополнительных затрат при порождении новых процессов в многопроцессорной библиотеке python. Но следующий код предполагает огромные накладные расходы:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

Вывод (и, между прочим, стоимость увеличивается с увеличением размера массива, поэтому я подозреваю, что все еще есть издержки, связанные с копированием памяти):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Почему такие огромные накладные расходы, если мы не копировали массив? И какую часть разделяемая память спасает меня?

@FrancisAvila есть ли способ поделиться не только массивом, но и произвольными объектами Python? CodeNoob
This answer хорошо объясняет, почему произвольные объекты Python не могут быть общими. Janne Karila
Вы смотрели наthe docs, право? Lev Levitsky
@LevLevitsky Я должен спросить, есть ли способ поделиться не только массивом, но и произвольными объектами Python? CodeNoob

Ваш Ответ

3   ответа
91

Если вы используете операционную систему, которая использует копирование при записиfork() семантики (как и любой обычный unix), тогда, пока вы никогда не измените свою структуру данных, она будет доступна для всех дочерних процессов без использования дополнительной памяти. Вам не нужно будет делать ничего особенного (за исключением того, что вы абсолютно уверены, что не изменили объект).

The most efficient thing you can do for your problem было бы упаковать ваш массив в эффективную структуру массива (используяnumpy или жеarray), поместите это в общую память, обернитеmultiprocessing.Arrayи передайте это своим функциям.Этот ответ показывает, как это сделать.

Если вы хотитеwriteable разделяемый объект, тогда вам нужно будет обернуть его какой-то синхронизацией или блокировкой.multiprocessing обеспечиваетдва способа сделать это: использование общей памяти (подходит для простых значений, массивов или типов) илиManager прокси, где один процесс хранит память, а менеджер осуществляет арбитражный доступ к ней со стороны других процессов (даже по сети).

Manager Подход можно использовать с произвольными объектами Python, но он будет медленнее, чем эквивалент с использованием разделяемой памяти, поскольку объекты должны быть сериализованы / десериализованы и отправлены между процессами.

Естьмножество библиотек параллельной обработки и подходов, доступных в Python. multiprocessing Это отличная и хорошо округленная библиотека, но если у вас есть особые потребности, возможно, один из других подходов может быть лучше.

@FabioZadrozny Будет ли он копировать весь объект или только страницу памяти, содержащую его счет?
AFAIK, только страница памяти, содержащая refcount (так, 4kb на каждый объект доступа).
Отметим, что в Python fork () фактически означает копирование при доступе (потому что простой доступ к объекту изменит его ref-count).
@max Используйте закрытие. Функция, даннаяapply_async должен ссылаться на общий объект в области видимости напрямую, а не через его аргументы.
@FrancisAvila как вы используете закрытие? Разве функция, которую вы передаете для apply_async, не может быть выбрана? Или это только ограничение map_async?
0

Это предполагаемый вариант использования длялуч, которая является библиотекой для параллельного и распределенного Python. Под капотом он сериализует объекты, используяApache Arrow макет данных (формат с нулевой копией) и сохраняет их вхранилище объектов с общей памятью поэтому они могут быть доступны нескольким процессам без создания копий.

Код будет выглядеть следующим образом.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

Если вы не звонитеray.put тогда массив все еще будет храниться в общей памяти, но это будет сделано один раз за вызовfuncчто не то, что вы хотите.

Обратите внимание, что это будет работать не только для массивов, но иalso for objects that contain arraysнапример, словари, отображающие целые числа в массивы, как показано ниже.

Вы можете сравнить производительность сериализации в Ray и pickle, выполнив следующее в IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Сериализация с Ray только немного быстрее, чем pickle, но десериализация в 1000 раз быстрее из-за использования общей памяти (это число, конечно, будет зависеть от объекта).

УвидетьЛуч документации, Вы можете прочитать больше обыстрая сериализация с использованием Ray и Arrow, Заметьте, я один из разработчиков Ray.

14

Я столкнулся с той же проблемой и написал небольшой вспомогательный класс для совместной работы.

Я использую multiprocessing.RawArray (lockfree), а также доступ к массивам вообще не синхронизирован (lockfree), будьте осторожны, чтобы не выстрелить себе в ноги.

Благодаря решению я получаю ускорения примерно в 3 раза на четырехъядерном процессоре i7.

Вот код: Не стесняйтесь использовать и улучшать его, и, пожалуйста, сообщайте о любых ошибках.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))
причина в том, что многопроцессорный пул вызывает fork (), когда создается экземпляр пула, поэтому все, что после этого не получит доступ к указателю на любой общий мем, созданный впоследствии.
Когда я попробовал этот код в py35, я получил исключение в multiprocessing.sharedctypes.py, поэтому я думаю, что этот код предназначен только для py2.
Просто осознал, что вам нужно настроить массивы разделяемой памяти перед созданием пула многопроцессорной обработки, пока не знаю, почему, но он определенно не будет работать наоборот.

Похожие вопросы