Вопрос по python – Как мне запустить os.walk параллельно в Python?

22

Я написал простое приложение на Java, которое берет список путей и генерирует файл со всеми путями в этом исходном списке.

Если у меня есть paths.txt, который имеет:

c:\folder1\
c:\folder2\
...
...
c:\folder1000\

Мое приложение запускает рекурсивную функцию для каждого многопоточного пути и возвращает файл со всеми путями файлов в этих папках.

Теперь я хочу написать это приложение на Python.

Я написал простое приложение, которое используетos.walk() запустить через заданную папку и распечатать пути к файлу для вывода.

Теперь я хочу запустить его параллельно, и я увидел, что в Python есть несколько модулей для этого: многопоточный и многопроцессорный.

Как лучше всего это сделать? И как это происходит?

запуск того же самого в Java оказался быстрее при запуске многопоточных. некоторые пути являются сетевыми, поэтому, возможно, многопоточный запуск оптимизирует сетевой трафик. user1251654
@AndreasJung: Я согласен, но как бы вы это сделали? static_rtti
Что вы ожидаете от многопоточного обхода? Поскольку вы ограничены вводом-выводом, однопоточный обход будет таким же быстрым, как два потока, разделяющих нагрузку ввода-вывода между обоими потоками .... так что вы ожидаете выиграть от многопоточного перехода? Andreas Jung
Использование многопроцессорного модуля, вероятно, более уместно здесь, по сравнению с модулем потоков. Andreas Jung
Одна из проблем, связанных с os.walk Python, заключается в том, что если у вас есть каталоги с большим количеством файлов, вы будете много читать в память, поскольку она перебирает информацию на один каталог за раз. Я обнаружил, что переключение на рекурсивный генератор os.listdir работает намного лучше, чем использование os.walk. woot

Ваш Ответ

3   ответа
24

multiprocessing решение:

from multiprocessing.pool import Pool
from multiprocessing import JoinableQueue as Queue
import os

def explore_path(path):
    directories = []
    nondirectories = []
    for filename in os.listdir(path):
        fullname = os.path.join(path, filename)
        if os.path.isdir(fullname):
            directories.append(fullname)
        else:
            nondirectories.append(filename)
    outputfile = path.replace(os.sep, '_') + '.txt'
    with open(outputfile, 'w') as f:
        for filename in nondirectories:
            print >> f, filename
    return directories

def parallel_worker():
    while True:
        path = unsearched.get()
        dirs = explore_path(path)
        for newdir in dirs:
            unsearched.put(newdir)
        unsearched.task_done()

# acquire the list of paths
with open('paths.txt') as f:
    paths = f.split()

unsearched = Queue()
for path in paths:
    unsearched.put(path)

pool = Pool(5)
for i in range(5):
    pool.apply_async(parallel_worker)

unsearched.join()
print 'Done'
os.walk () возвращает один раз, прежде чем рекурсивно.break гарантирует, что он никогда не дойдет до рекурсивной стадии. Код правильный. Если это вас не устраивает, то вручную разбейте os.listdir () на каталоги и файлы, используя os.path.isdir (). Это то, что os.walk () делает внутри. На вопрос ОП ответили справедливо. Теперь пришло время выполнить свой долг.
@static_rtti Почему это имеет значение? Точка распараллеливания состоит в том, чтобы ускорить задачу в целом. Более мелкозернистое распараллеливание обычно дороже, чем конечно-детализированное. Кроме того, потребуется, чтобы вы сломалиos.walk() абстракция и, по сути, переписать часть кода (исключая преимущества наличия библиотеки в первую очередь). Кроме того, параллелизацию самой ходьбы не очень полезно, поскольку это операция ввода-вывода.
Это распараллелит работу, которую вы делаете, но не ходьбу.
@static_rtti Несмотря на то, что вы правы, от точных обстоятельств зависит, будет ли выгодным распараллеливание самой прогулки, но параллелизм по курсу для нескольких задач ходьбы по сети на разных дисках почтиfor sure обеспечивает ускорение, по опыту. Вы когда-нибудь делалиdu -s * илиfind ... на нескольких сетевых дисках? Вы никогда не будете насыщать свой сетевой интерфейс такими командами, и они работают совершенно независимо через разные системы (например, соединения NFS).
@static_rtti Еще одна мысль. ОП заявил в комментариях, что он использует несколько сетевых дисков, которые могут работать параллельно. Таким образом, выигрыш в скорости достигается за счет параллельного выполнения нескольких задач ходьбы. Если бы вы попытались распараллелить ходьбу, это не принесло бы пользы, потому что рычаг чтения / записи на одном диске может выполнять только одну операцию за раз.
0

спользую следующий код для обхода дерева SharePoint, получая довольно значительное ускорение примерно для 50 потоков. Эта конкретная программа возвращает пары (путь, данные) для всех XML-файлов в структуре каталогов и может быть просто расширена для вашего использования. (Это вырезано и вставлено из моей программы; требуется дополнительное редактирование.)

#unique string for error passing error messages
ERROR = '\xffERROR\xff'

class ScanWorker(threading.Thread):
    """Worker class for scanning directory structures.
    pathQueue: queue for pathnames of directories
    resultQueue: results of processFile, pairs of (path, data) to be updated
    """
    lock = threading.Lock()
    dirCount = 0
    def __init__(self, pathQueue, resultQueue):
        self.pathQueue = pathQueue
        self.resultQueue = resultQueue
        super().__init__()

    def run(self):
        """Worker thread.
        Get a directory, process it, and put new directories on the
        queue."""
        try:
            while True:
                self.processDir(self.pathQueue.get())
                self.pathQueue.task_done()
        except Exception as e:
            #pass on exception to main thread
            description = traceback.format_exception(*sys.exc_info())
            description.insert(0,
                "Error in thread {}:\n,".format(
                    threading.current_thread().name))
            self.resultQueue.put((ERROR, description))
            self.pathQueue.task_done()

    def processDir(self, top):
        """Visit a directory
        Call self.processFile on every file, and queue the directories.
        """
        #Wait and retry a few times in case of network errors.
        #SharePoint is not reliable, gives errors for no reason
        for retryCount in range(30):
            try:
                names = listdir(top)
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="L", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("List: too many retries")
            raise lastError
        #it is not important to worry about race conditions here
        self.__class__.dirCount += 1
        #process contents
        for name in names:
            if isdir(join(top, name)): self.pathQueue.put(join(top, name))
            else: self.processFile(join(top, name))

    def processFile(self, path):
        """Get XML file.
        """
        #only xml files
        if not path.lower().endswith('.xml'): return
        filemtime = datetime.fromtimestamp(getmtime(path))
        #SharePoint is not reliable, gives errors for no reason; just retry
        for retryCount in range(30):
            try:
                data = open(path,'rb').read()
                break
            except OSError as e:
                if e.errno in (2,22):
                    lastError = e
                    print(end="R", flush=True)
                    time.sleep(1)
                else:
                    raise
        else:
            print("Read: too many retries")
            raise lastError
        self.resultQueue.put((path, data))

class Scanner:
    """Interface to the ScanWorkers
    Sharepoint is pretty fast compared to its delay and handles 50 workers well
    Make sure you only create one instance of Scanner!
    """
    def __init__(self, workers):
        #don't restrict the path queue length; this causes deadlock
        #we use a LIFO queue to get more depth-first like search
        #reducing average queue length and hope,fully improving server caching
        self.pathQueue = queue.LifoQueue()
        #this is the output queue to the main thread
        self.resultQueue = queue.Queue(5)
        self.workers = workers
        #start workers
        for i in range(workers):
            t = ScanWorker(self.pathQueue, self.resultQueue)
            t.setDaemon(True)
            t.start()

    def startWorkers(self, path):
        #add counter
        self.added = 0
        #and go
        self.pathQueue.put(path)

    def processResult(self, wait=True):
        """Get an element from the result queue, and add to the zip file."""
        path, data = self.resultQueue.get(block=wait)
        if path==ERROR:
            #process gave alarm; stop scanning
            #pass on description
            raise ScanError(data)
        <do whatever you want to do with the file>
        self.resultQueue.task_done()
        self.added += 1

#main
try:
    #set up
    scanner = Scanner(threads)
    scanner.startWorkers(rootpath)
    pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue
    #scanner is rolling; wait for it to finish
    with pathQueue.all_tasks_done:
        while pathQueue.unfinished_tasks:
            #tasks are still running
            #process results
            while True:
                try: scanner.processResult(wait=False)
                except queue.Empty: break
            #no new files found; check if scanner is ready
            done = pathQueue.all_tasks_done.wait(timeout=1)
            if not done:
                #Not yet; print something while we wait
                print(
                    "\rProcessed {} files from {} directories [{} {}]  "
                    .format(
                        scanner.added,
                        ScanWorker.dirCount,
                        pathQueue.unfinished_tasks,
                        resultQueue.unfinished_tasks,
                    ), end='\r')
    #just to make sure everybody is ready: join the path queue
    pathQueue.join()
    #process remaining of result queue
    while resultQueue.unfinished_tasks: scanner.processResult(wait=True)
    #go to new line to prevent overwriting progress messages
    print()
except ScanError as e:
    print()
    print(*e.args[0], end='')
    print("Process interrupted.")
except KeyboardInterrupt:
    print("\nProcess interrupted.")
print()
6

который был полезен для меня. Я не уверен, что многопоточность увеличит вашу производительность из-за того, как потоки работают в CPython.

import threading
import Queue
import os

class PathThread (threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print path + "/" + f

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()

# threadsafe queue
pathqueue = Queue.Queue()
paths = ["foo", "bar", "baz"]

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()
Поточный подход к распараллеливанию, вероятно, будет контрпродуктивным. Глобальная блокировка интерпретатора Python предотвратит одновременное выполнение любых двух потоков.
@RaymondHettinger Совершенно неправильно в этом случае, извините. Вы повторяете большую ложь GIL. Операции блокировки, ввода / вывода не задерживаются GIL, и во многих случаях обработка происходит за пределами GIL. Не поймите меня неправильно, GIL - отстой, но это не делает многопоточность в Python бесполезной.
@RaymondHettinger Не для IO, хотя :)

Похожие вопросы