Вопрос по multiprocessing, python – Многопроцессорные обновления глобальных переменных Python не возвращаются родителю

26

Я пытаюсь вернуть значения из подпроцессов, но эти значения, к сожалению, не выбираются. Поэтому я успешно использовал глобальные переменные в модуле потоков, но не смог получить обновления, сделанные в подпроцессах при использовании многопроцессорного модуля. Надеюсь, я что-то упустил.

Результаты, напечатанные в конце, всегда совпадают с начальными значениями, заданными переменными dataDV03 и dataDV04. Подпроцессы обновляют эти глобальные переменные, но эти глобальные переменные остаются неизменными в родительском.

import multiprocessing

# NOT ABLE to get python to return values in passed variables.

ants = ['DV03', 'DV04']
dataDV03 = ['', '']
dataDV04 = {'driver': '', 'status': ''}


def getDV03CclDrivers(lib):  # call global variable
    global dataDV03
    dataDV03[1] = 1
    dataDV03[0] = 0
# eval( 'CCL.' + lib + '.' +  lib + '( "DV03" )' ) these are unpicklable instantiations

def getDV04CclDrivers(lib, dataDV04):   # pass global variable
    dataDV04['driver'] = 0  # eval( 'CCL.' + lib + '.' +  lib + '( "DV04" )' )


if __name__ == "__main__":

    jobs = []
    if 'DV03' in ants:
        j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',))
        jobs.append(j)

    if 'DV04' in ants:
        j = multiprocessing.Process(target=getDV04CclDrivers, args=('LORR', dataDV04))
        jobs.append(j)

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()

    print 'Results:\n'
    print 'DV03', dataDV03
    print 'DV04', dataDV04

Я не могу опубликовать свой вопрос, поэтому постараюсь отредактировать оригинал.

Вот объект, который нельзя отобрать:

In [1]: from CCL import LORR
In [2]: lorr=LORR.LORR('DV20', None)
In [3]: lorr
Out[3]: <CCL.LORR.LORR instance at 0x94b188c>

Эта ошибка возвращается, когда я использую multiprocessing.Pool для возврата экземпляра родителю:

Thread getCcl (('DV20', 'LORR'),)
Process PoolWorker-1:
Traceback (most recent call last):
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
self.run()
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/pool.py", line 71, in worker
put((job, i, result))
File "/alma/ACS-10.1/casa/lib/python2.6/multiprocessing/queues.py", line 366, in put
return send(obj)
UnpickleableError: Cannot pickle <type 'thread.lock'> objects



In [5]: dir(lorr)
Out[5]:
['GET_AMBIENT_TEMPERATURE',
 'GET_CAN_ERROR',
 'GET_CAN_ERROR_COUNT',
 'GET_CHANNEL_NUMBER',
 'GET_COUNT_PER_C_OP',
 'GET_COUNT_REMAINING_OP',
 'GET_DCM_LOCKED',
 'GET_EFC_125_MHZ',
 'GET_EFC_COMB_LINE_PLL',
 'GET_ERROR_CODE_LAST_CAN_ERROR',
 'GET_INTERNAL_SLAVE_ERROR_CODE',
 'GET_MAGNITUDE_CELSIUS_OP',
 'GET_MAJOR_REV_LEVEL',
 'GET_MINOR_REV_LEVEL',
 'GET_MODULE_CODES_CDAY',
 'GET_MODULE_CODES_CMONTH',
 'GET_MODULE_CODES_DIG1',
 'GET_MODULE_CODES_DIG2',
 'GET_MODULE_CODES_DIG4',
 'GET_MODULE_CODES_DIG6',
 'GET_MODULE_CODES_SERIAL',
 'GET_MODULE_CODES_VERSION_MAJOR',
 'GET_MODULE_CODES_VERSION_MINOR',
 'GET_MODULE_CODES_YEAR',
 'GET_NODE_ADDRESS',
 'GET_OPTICAL_POWER_OFF',
 'GET_OUTPUT_125MHZ_LOCKED',
 'GET_OUTPUT_2GHZ_LOCKED',
 'GET_PATCH_LEVEL',
 'GET_POWER_SUPPLY_12V_NOT_OK',
 'GET_POWER_SUPPLY_15V_NOT_OK',
 'GET_PROTOCOL_MAJOR_REV_LEVEL',
 'GET_PROTOCOL_MINOR_REV_LEVEL',
 'GET_PROTOCOL_PATCH_LEVEL',
 'GET_PROTOCOL_REV_LEVEL',
 'GET_PWR_125_MHZ',
 'GET_PWR_25_MHZ',
 'GET_PWR_2_GHZ',
 'GET_READ_MODULE_CODES',
 'GET_RX_OPT_PWR',
 'GET_SERIAL_NUMBER',
 'GET_SIGN_OP',
 'GET_STATUS',
 'GET_SW_REV_LEVEL',
 'GET_TE_LENGTH',
 'GET_TE_LONG_FLAG_SET',
 'GET_TE_OFFSET_COUNTER',
 'GET_TE_SHORT_FLAG_SET',
 'GET_TRANS_NUM',
 'GET_VDC_12',
 'GET_VDC_15',
 'GET_VDC_7',
 'GET_VDC_MINUS_7',
 'SET_CLEAR_FLAGS',
 'SET_FPGA_LOGIC_RESET',
 'SET_RESET_AMBSI',
 'SET_RESET_DEVICE',
 'SET_RESYNC_TE',
 'STATUS',
 '_HardwareDevice__componentName',
 '_HardwareDevice__hw',
 '_HardwareDevice__stickyFlag',
 '_LORRBase__logger',
 '__del__',
 '__doc__',
 '__init__',
 '__module__',
 '_devices',
 'clearDeviceCommunicationErrorAlarm',
 'getControlList',
 'getDeviceCommunicationErrorCounter',
 'getErrorMessage',
 'getHwState',
 'getInternalSlaveCanErrorMsg',
 'getLastCanErrorMsg',
 'getMonitorList',
 'hwConfigure',
 'hwDiagnostic',
 'hwInitialize',
 'hwOperational',
 'hwSimulation',
 'hwStart',
 'hwStop',
 'inErrorState',
 'isMonitoring',
 'isSimulated']

In [6]: 
& quot; эти значения не подлежат расслаиванию & quot; - Вы имеете в виду вещи, которые ваша упаковка не может быть засолена? Если это так, то вы не можете использовать подпроцесс (AFAIK), поскольку именно так информация передается между процессами. Если данные могут быть выбраны, вы захотите использоватьManager. mgilson
Вы не должны "публиковать" ничего кроме ответов на ваш вопрос. Так что хорошо, что вы отредактировали вместо этого; это правильно делать в этом случае. senderle
Еще одна альтернатива, которую я недавно обнаружил,apply_async Перезвоните. Обратный вызов выполняется в родительском процессе. Это означает, что все, что возвращает подпроцесс, может быть передано процессу обратного вызова, и процесс обратного вызова может затем изменять глобальные переменные. Это, однако, требует использованияglobal variableName объявление в верхней части функции обратного вызова. CMCDragonkai

Ваш Ответ

5   ответов
5

Когда используешьmultiprocessединственный способ передавать объекты между процессами - это использоватьQueue или жеPipe; глобалы не передаются. Объекты должны быть замаринованными, поэтомуmultiprocess не поможет вам здесь.

опять-таки очереди являются единственным способом передачи значений в родительскую функцию или только для функций того же уровня? ,
В multiprocess есть менеджер для перемещения данных между процессами.
5

@DBlas дает вам быстрый URL-адрес и ссылку на класс Manager в ответе, но я думаю, что он все еще немного расплывчатый, поэтому я подумал, что вам может быть полезно просто увидеть его применимым ...

import multiprocessing
from multiprocessing import Manager

ants = ['DV03', 'DV04']

def getDV03CclDrivers(lib, data_dict):  
    data_dict[1] = 1
    data_dict[0] = 0

def getDV04CclDrivers(lib, data_list):   
    data_list['driver'] = 0  


if __name__ == "__main__":

    manager = Manager()
    dataDV03 = manager.list(['', ''])
    dataDV04 = manager.dict({'driver': '', 'status': ''})

    jobs = []
    if 'DV03' in ants:
        j = multiprocessing.Process(
                target=getDV03CclDrivers, 
                args=('LORR', dataDV03))
        jobs.append(j)

    if 'DV04' in ants:
        j = multiprocessing.Process(
                target=getDV04CclDrivers, 
                args=('LORR', dataDV04))
        jobs.append(j)

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()

    print 'Results:\n'
    print 'DV03', dataDV03
    print 'DV04', dataDV04

Поскольку многопроцессорная обработка на самом деле использует отдельные процессы, вы не можете просто совместно использовать глобальные переменные, поскольку они будут находиться в совершенно разных & quot; пробелах & quot; в памяти. То, что вы делаете с глобальным в рамках одного процесса, не будет отражаться в другом. Хотя я и признаю, что это кажется странным, так как вы видите его, все живет прямо там в одном и том же куске кода, поэтому "почему бы не использовать эти методы для доступа к глобальному"? Сложнее заворачивать голову, думая, что они будут работать в разных процессах.

Класс менеджера Он служит прокси для структур данных, которые могут передавать информацию между процессами. То, что вы будете делать, - это создать специальный дикт и список из менеджера, передать их в ваши методы и работать с ними локально.

Un-pickle-able data

Для вашего специализированного объекта LORR вам может понадобиться создать что-то вроде прокси, которое может представлять выбираемое состояние экземпляра.

Не супер надежный или проверенный много, но дает вам идею.

class LORRProxy(object):

    def __init__(self, lorrObject=None):
        self.instance = lorrObject

  ,  def __getstate__(self):
        # how to get the state data out of a lorr instance
        inst = self.instance
        state = dict(
            foo = inst.a,
            bar = inst.b,
        )
        return state

    def __setstate__(self, state):
        # rebuilt a lorr instance from state
        lorr = LORR.LORR()
        lorr.a = state['foo']
        lorr.b = state['bar']
        self.instance = lorr
@senderle: Это может быть правдой, но с другой стороны, ОП еще не предоставил ни одного примера того, что такое необратимые данные. Я могу только ответить на то, что я вижу :-)
Я не могу изменить базовый код, который я использую для создания экземпляра объекта. Это глубоко в системе обсерватории ALMA и потребует героических усилий, чтобы заставить кого-то изменить его. Может ли произвольный экземпляр класса быть сериализован или преобразован в ctype? Buoy
НоManager использование объектовpickle передавать данные. Таким образом, если данные OP действительно непроницаемы, я думаю, что это не сработает.
Спасибо за подробные ответы. В соответствии с ответами взаимоблокировки: 1) значения, которые мне нужно передать родительским ошибкам при возврате, и отдельные пробелы подпроцесса препятствуют возвращению глобальных переменных. Buoy
@ user1459256: senderle показывает пример того, как создать структуру класса, которую можно мариновать. Таким образом, вы можете объединить это либо с предложением очереди, либо с моим, добавив их в менеджер dict и список объектов прокси.
1

Я использую p.map (), чтобы выделить несколько процессов на удаленных серверах и распечатать результаты, когда они возвращаются в непредсказуемое время:

Servers=[...]
from multiprocessing import Pool
p=Pool(len(Servers))
p.map(DoIndividualSummary, Servers)

Это работало нормально, еслиDoIndividualSummary используемыйprint для результатов, но общий результат был в непредсказуемом порядке, что затрудняло интерпретацию. Я попробовал несколько подходов к использованию глобальных переменных, но столкнулся с проблемами. Наконец, мне удалось с sqlite3.

Доp.map()Откройте соединение sqlite и создайте таблицу:

import sqlite3
conn=sqlite3.connect('servers.db') # need conn for commit and close
db=conn.cursor()
try: db.execute('''drop table servers''')
except: pass
db.execute('''CREATE TABLE servers (server text, serverdetail text, readings     text)''')
conn.commit()

Затем при возвращении изDoIndividualSummary()сохранить результаты в таблицу:

db.execute('''INSERT INTO servers VALUES (?,?,?)''',         (server,serverdetail,readings))
conn.commit()
return

Послеmap() заявление, распечатать результаты:

db.execute('''select * from servers order by server''')
rows=db.fetchall()
for server,serverdetail,readings in rows: print serverdetail,readings

Может показаться излишним, но для меня это было проще, чем рекомендуемые решения.

31

Когда вы используетеmultiprocessing чтобы открыть второй процесс,entirely new instance Python, с его собственным глобальным состоянием, создан. Это глобальное состояние не является общим, поэтому изменения, внесенные дочерними процессами в глобальные переменные, будут невидимы для родительского процесса.

Кроме того, большинство абстракций, которыеmultiprocessing обеспечивает использование рассола для передачи данных. Все данные передаются с использованием проксидолжен быть маринованным; который включает в себя все объекты, которыеManager provides, Соответствующие цитаты (мой акцент):

Ensure that the arguments to the methods of proxies are picklable.

И (вManager раздел):

Other processes can access the shared objects by using proxies.

Queues также требуют маринованных данных; документы не говорят об этом, но быстрый тест подтверждает это:

import multiprocessing
import pickle

class Thing(object):
    def __getstate__(self):
        print 'got pickled'
        return self.__dict__
    def __setstate__(self, state):
        print 'got unpickled'
        self.__dict__.update(state)

q = multiprocessing.Queue()
p = multiprocessing.Process(target=q.put, args=(Thing(),))
p.start()
print q.get()
p.join()

Выход:

$ python mp.py 
got pickled
got unpickled
<__main__.Thing object at 0x10056b350>

Один подход, которыйmight работать для вас, если вы действительно не можете выбрать данные, это найти способ сохранить их какctype объект; ссылка на память может бытьперешел на дочерний процесс, Это кажется мне довольно хитрым; Я никогда этого не делал. Но это может быть возможным решением для вас.

Учитывая ваше обновление, кажется, что вам нужно знать намного больше о внутренностяхLORR, ЯвляетсяLORR класс? Можете ли вы подкласс от него? Это подкласс чего-то еще? Что это за MRO? (ПытатьсяLORR.__mro__ и опубликовать вывод, если он работает.) Если это чистый объект python, можно было бы создать его подкласс, создав__setstate__ и__getstate__ включить травление.

Другой подход может заключаться в том, чтобы выяснить, как получить соответствующие данные изLORR экземпляр и передать его через простую строку. Поскольку вы говорите, что вы действительно хотите вызывать методы объекта, почему бы просто не сделать это, используяQueues отправлять сообщения туда и обратно? Другими словами, примерно так (схематично):

Main Process              Child 1                       Child 2
                          LORR 1                        LORR 2 
child1_in_queue     ->    get message 'foo'
                          call 'foo' method
child1_out_queue    <-    return foo data string
child2_in_queue                   ->                    get message 'bar'
                                                        call 'bar' method
child2_out_queue                  <-                    return bar data string
@ user1459256, посмотрите мои последние изменения и дайте мне знать, будет ли работать предложенное решение.
@ user1459256, рассмотрите мое редактирование (внизу моего поста). Нам нужно больше информации оLORR объекты для разработки осуществимого подхода.
@ user1459256: Лично я не понимаю, почему длительные подпроцессы являются большой проблемой, если вы постоянно общаетесь с ними. (или долго работающие темы)
Извините, я медленно переваривал все это. У объекта LORR нетmro, Это интерфейс Python для кода C ++. Мне нужно, чтобы объекты сохранялись от нескольких минут до нескольких часов. Я думал, что темы не должны управляться в течение длительных периодов. Поскольку я не могу передать объект из подпроцесса, мне нужно было бы поддерживать подпроцессы активными в течение нескольких часов, пока я передаю результаты методов при вызове. Это разумно? Если так, то это кажется решением проблем. Buoy
@ user1459256, хорошо, когда вы говорите, что вам нужно вызывать методы позже, чтобы получить текущие данные при вызове - это заставляет меня думать, что вам действительно не нужно передаватьLORR объекты вообще, но вам скорее нужно передать данные, возвращенные методом, наLORR объект. Но это должно быть легко! Просто используйте передачу сообщений черезQueue сказать дочернему процессу, чтобы он вызывал определенный метод, а затем попросил, чтобы потомок возвратил значение через returnQueue.
3

Вы также можете использоватьмногопроцессорный массив, Это позволяет вам иметь общее состояние между процессами и, вероятно, ближе всего к глобальной переменной.

В верхней части main объявите Array. Первый аргумент 'i' говорит, что это будут целые числа. Второй аргумент дает начальные значения:

shared_dataDV03 = multiprocessing.Array ('i', (0, 0)) #a shared array

Затем передайте этот массив процессу в качестве аргумента:

j = multiprocessing.Process(target=getDV03CclDrivers, args=('LORR',shared_dataDV03))

Вы должны получить аргумент массива в вызываемой функции, а затем вы можете изменить его в функции:

def getDV03CclDrivers(lib,arr):  # call global variable
    arr[1]=1
    arr[0]=0

Массив используется совместно с родителем, поэтому вы можете распечатать значения в конце родительского элемента:

print 'DV03', shared_dataDV03[:]

И это покажет изменения:

DV03 [0, 1]

Похожие вопросы