Вопрос по mclapply, r, progress-bar – Есть ли способ отслеживать прогресс на mclapply?

40

Я люблю обстановку.progress = 'text' вplyr's llply, Тем не менее, это вызывает у меня большое беспокойство, чтобы не знать, как далекоmclapply (из пакетаmulticore), поскольку элементы списка отправляются различным ядрам, а затем в конце сопоставляются.

Я выводил сообщения типа*currently in sim_id # ....* но это не очень полезно, потому что это не дает мне индикатора того, какой процент элементов списка завершен (хотя полезно знать, что мой сценарий не застрял и не продвинулся).

Может ли кто-то предложить другие идеи, которые позволят мне взглянуть на мой.Rout файл и получить чувство прогресса? Я думал о добавлении ручного счетчика, но не вижу, как бы это реализовать, так какmclapply должен завершить обработку всех элементов списка, прежде чем он сможет дать какие-либо отзывы.

Смотрите мой ответ на похожий вопрос:stackoverflow.com/a/5431265/653825 otsaw
Отличный вопрос,package multicore больше не доступен, есть ли обходной путь без пакетаmulticore? forecaster
@ Forecaster: Да, посмотрите наparallel пакет. fotNelton
Отличный ответ здесь @fotNelton и других на его основе для повторного использования. Как быстрое решение, чтобы увидеть прогресс в разовомmclapply звонки, вы также можете простоcat(".") в рабочей функции. codeola

Ваш Ответ

5   ответов
7

pbapply Пакет реализовал это для общего случая. И то и другоеpblapply а такжеpbsapply иметьcl аргумент. Отдокументация:

Parallel processing can be enabled through the cl argument. parLapply is called when cl is a ’cluster’ object, mclapply is called when cl is an integer. Showing the progress bar increases the communication overhead between the main process and nodes / child processes compared to the parallel equivalents of the functions without the progress bar. The functions fall back to their original equivalents when the progress bar is disabled (i.e. getOption("pboptions")$type == "none" dopb() is FALSE). This is the default when interactive() if FALSE (i.e. called from command line R script).

Если никто не поставляетcl (или проходитNULL) непараллельный по умолчаниюlapply используется, включая индикатор выполнения.

Вы нашли чистый способ обернутьtryCatch вокругpblapply/mclapply функция, чтобы он останавливал кластер, когда он генерирует исключение? Когда я пытаюсь остановить эти параллельные процессы в RStudio, я получаю убегающие ядра, которые приходится вручную убивать через терминал.
@spacedSparking Нет, но я используюfurrr пакет вместо.
26

mclapply порождает несколько процессов, возможно, вы захотите использовать fifo, pipe или даже сокеты. Теперь рассмотрим следующий пример:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")

Здесь временный файл используется как fifo, и основной процесс разветвляет ребенка, единственная обязанность которого - сообщать о текущем прогрессе. Основной процесс продолжается по телефонуmclapply где выражение (точнее, блок выражения), которое должно быть оценено, записывает частичную информацию о ходе выполнения в буфер fifo посредствомwriteBin.

Поскольку это всего лишь простой пример, вам, вероятно, придется адаптировать весь материал вывода к вашим потребностям. НТН!

В случаеmclapply основной процесс ожидает завершения всех дочерних процессов, поэтому без разветвления другого дочернего процесса невозможно получить и обработать сообщения, покаmclapply все еще работает.
Привет Нельтон, это очень полезный ответ, пакетmulticore больше не доступен в CRAN. Есть ли обходной путь без использования пакета?multicore Благодарю вас
Это эффективно отличается от использования стандартных функцийmessage а такжеsink? Сообщения от всех дочерних процессов отправляются в тот же приемник без задержки, верно?
@fotNelton: Исходя из моего опыта, кажется, что дочерние процессы отправляют stdout и stderr так же, как и родительский процесс, без каких-либо задержек. Но, может быть, это зависит от ОС?
@otsaw: Что вы говорите о stdout, соотв. stderr прав, но так как основной процесс заблокирован во время ожиданияmclapply для завершения необходим другой процесс или поток для обработки результатов выполнения.
2

используйте индикатор выполнения вместо построчной печати и вызовите внешнюю функцию с помощью mclapply.

library('utils')
library('multicore')

prog.indic <- local({ #evaluates in local environment only
    f <- fifo(tempfile(), open="w+b", blocking=T) # open fifo connection
    assign(x='f',value=f,envir=.GlobalEnv)
    pb <- txtProgressBar(min=1, max=MC,style=3)

    if (inherits(fork(), "masterProcess")) { #progress tracker
        # Child
        progress <- 0.0
        while (progress < MC && !isIncomplete(f)){ 
            msg <- readBin(f, "double")
                progress <- progress + as.numeric(msg)

            # Updating the progress bar.
            setTxtProgressBar(pb,progress)
            } 


        exit()
        }
   MC <- 100
   result <- mclapply(1:MC, .mcfunc)

    cat('\n')
    assign(x='result',value=result,envir=.GlobalEnv)
    close(f)
    })

.mcfunc<-function(i,...){
        writeBin(1, f)
        return(i)
    }

Назначение подключения fifo .GlobalEnv необходимо для использования его из функции вне вызова mclapply. Спасибо за вопросы и предыдущие ответы, я долго думал, как это сделать.

7

решение @ fotNelton применять везде, где вы обычно используете mcapply.

mcadply <- function(X, FUN, ...) {
  # Runs multicore lapply with progress indicator and transformation to
  # data.table output. Arguments mirror those passed to lapply.
  #
  # Args:
  # X:   Vector.
  # FUN: Function to apply to each value of X. Note this is transformed to 
  #      a data.frame return if necessary.
  # ...: Other arguments passed to mclapply.
  #
  # Returns:
  #   data.table stack of each mclapply return value
  #
  # Progress bar code based on https://stackoverflow.com/a/10993589
  require(multicore)
  require(plyr)
  require(data.table)

  local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
      # Child
      progress <- 0
      print.progress <- 0
      while (progress < 1 && !isIncomplete(f)) {
        msg <- readBin(f, "double")
        progress <- progress + as.numeric(msg)
        # Print every 1%
        if(progress >= print.progress + 0.01) {
          cat(sprintf("Progress: %.0f%%\n", progress * 100))
          print.progress <- floor(progress * 100) / 100
        }
      }
      exit()
    }

    newFun <- function(...) {
      writeBin(1 / length(X), f)
      return(as.data.frame(FUN(...)))
    }

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
    close(f)
    cat("Done\n")
    return(result)
  })
}
13

добавление еще одной версии решения @ fotNelson, но с некоторыми изменениями:

Drop in replacement for mclapply (supports all mclapply functions) Catches ctrl-c calls and aborts gracefully uses built in progress bar (txtProgressBar) option to track progress or not and use a specified style of progress bar uses parallel rather than multicore which has now been removed from CRAN coerces X to list as per mclapply (so length(X) gives expected results) roxygen2 style documentation at the top

Надеюсь, это поможет кому-то ...

library(parallel)

#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#' 
#' Based on http://stackoverflow.com/questions/10984556
#' 
#' @param X         a vector (atomic or list) or an expressions vector. Other
#'                  objects (including classed objects) will be coerced by
#'                  ‘as.list’
#' @param FUN       the function to be applied to
#' @param ...       optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style    style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE,
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
    mc.cleanup = TRUE, mc.allow.recursive = TRUE,
    mc.progress=TRUE, mc.style=3) 
{
    if (!is.vector(X) || is.object(X)) X <- as.list(X)

    if (mc.progress) {
        f <- fifo(tempfile(), open="w+b", blocking=T)
        p <- parallel:::mcfork()
        pb <- txtProgressBar(0, length(X), style=mc.style)
        setTxtProgressBar(pb, 0) 
        progress <- 0
        if (inherits(p, "masterProcess")) {
            while (progress < length(X)) {
                readBin(f, "double")
                progress <- progress + 1
                setTxtProgressBar(pb, progress) 
            }
            cat("\n")
            parallel:::mcexit()
        }
    }
    tryCatch({
        result <- mclapply(X, ..., function(...) {
                res <- FUN(...)
                if (mc.progress) writeBin(1, f)
                res
            }, 
            mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
            mc.silent = mc.silent, mc.cores = mc.cores,
            mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
        )

    }, finally = {
        if (mc.progress) close(f)
    })
    result
}
Похоже, параллель ::: mcfork не работает должным образом в Rstudio. Разрешение выше моего понимания, и лучше всего рассматривать его как отдельный вопрос о стековом потоке. Если я получу решение, я отправлю сюда сообщение ...
Эта версия не показывает ход выполнения задачи. Индикатор выполнения начинается с 0% и остается там.
ОК, должно быть, ошибка - я посмотрю ...
Эта функция работает для меня в OS X и Linux, так что, возможно, это проблема Windows.
Этот метод для отслеживания прогресса не будет работать с Rstudio (см. Обсуждение здесь:stackoverflow.com/questions/27314011), потому что вывод из разветвленного процесса (который печатает прогресс на экран) игнорируется в Rstudio ...

Похожие вопросы