5

Вопрос по stdin, php, java – ThreadPool процессов CLI

Мне нужно передавать сообщения в CLI PHP-процессы через стандартный ввод из Java. Мне бы хотелось, чтобы в пуле работало около 20 процессов PHP, так что когда я передаю сообщение в пул, оно отправляет каждое сообщение в отдельный поток, сохраняя очередь сообщений для доставки. Мне бы хотелось, чтобы эти процессы PHP оставались в живых как можно дольше, создавая новый, если он умирает. Я смотрел на это с помощью статического пула потоков, но он, кажется, больше предназначен для задач, которые выполняются и просто умирают. Как я мог сделать это с помощью простого интерфейса для передачи сообщения в пул? Должен ли я реализовать свой собственный «пул потоков»?

  • Error: User Rate Limit Exceeded

    от Will
  • Error: User Rate Limit Exceeded

    от Will
  • Error: User Rate Limit Exceeded

    от
  • Error: User Rate Limit Exceeded

    от
  • Error: User Rate Limit Exceeded

    от Will
  • Error: User Rate Limit Exceeded

    от Will
  • Это никогда не будет сделано обработка. Если кто-то умирает, мне нужно создать новый, чтобы заменить его. Я буду передавать им данные в виде циклического перебора через stdin.

    от Will
  • У меня есть какой-либо вывод из PHP такой, что вы знаете, когда это будет сделано обработки?

    от Clint
  • Очень похоже на этот вопрос:stackoverflow.com/questions/2592093/php-thread-pool

    от David-SkyMesh
  • 1

    Лучше всего использовать функции pcntl для форка процесса

    но связь между процессами затруднена. Я бы порекомендовал создать очередь, из которой ваши процессы могут читать, вместо того, чтобы пытаться передавать сообщения в командную строку.

    Beanstalk имеет несколько клиентов PHP, которые вы можете использовать для обмена сообщениями между процессами.

  • 4

    Я предоставляю некоторый код с этим

    поскольку я думаю, что это прояснит ситуацию. По сути, вам нужно хранить пул объектов процесса. Учтите, что у каждого из этих процессов есть поток ввода, вывода и потока ошибок, которыми вы должны каким-то образом управлять. В моем примере я просто перенаправляю ошибку и вывод на консоль основных процессов. Вы можете настроить обратные вызовы и обработчики для получения вывода программы PHP при необходимости. Если вы просто обрабатываете задачи и не заботитесь о том, что говорит PHP, оставьте все как есть или перенаправьте в файл.

    Я используюApache Commons Pool библиотека для ObjectPool. Не нужно заново изобретать.

    У вас будет пул из 20 процессов, которые запускают вашу PHP-программу. Одно это не даст вам того, что вам нужно. Возможно, вы захотите обрабатывать задачи для всех этих 20 процессов одновременно. Поэтому вам также понадобится ThreadPool, который будет извлекать Process из вашего ObjectPool.

    Вы также должны понимать, что если вы убьете или CTRL-C, ваш Java обработаетinit процесс возьмет на себя ваши процессы PHP, и они просто будут сидеть там. Вы, вероятно, захотите сохранить журнал всех pid-процессов PHP-процессов, которые вы породили, а затем очистить их, если вы перезапустите Java-программу.

    public class StackOverflow_10037379 {
    
        private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());
    
        public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
    
            private String mProcessToRun;
    
            public CLIPoolableObjectFactory(String processToRun) {
                mProcessToRun = processToRun;
            }
    
            @Override
            public Process makeObject() throws Exception {
                ProcessBuilder builder = new ProcessBuilder();
                builder.redirectError(Redirect.INHERIT);
                // I am being lazy, but really the InputStream is where
                // you can get any output of the PHP Process. This setting
                // will make it output to the current processes console.
                builder.redirectOutput(Redirect.INHERIT);
                builder.redirectInput(Redirect.PIPE);
                builder.command(mProcessToRun);
                return builder.start();
            }
    
            @Override
            public boolean validateObject(Process process) {
                try {
                    process.exitValue();
                    return false;
                } catch (IllegalThreadStateException ex) {
                    return true;
                }
            }
    
            @Override
            public void destroyObject(Process process) throws Exception {
                // If PHP has a way to stop it, do that instead of destroy
                process.destroy();
            }
    
            @Override
            public void passivateObject(Process process) throws Exception {
                // Should really try to read from the InputStream of the Process
                // to prevent lock-ups if Rediret.INHERIT is not used.
            }
        }
    
        public static class CLIWorkItem implements Runnable {
    
            private ObjectPool<Process> mPool;
            private String mWork;
    
            public CLIWorkItem(ObjectPool<Process> pool, String work) {
                mPool = pool;
                mWork = work;
            }
    
            @Override
            public void run() {
                Process workProcess = null;
                try {
                    workProcess = mPool.borrowObject();
                    OutputStream os = workProcess.getOutputStream();
                    os.write(mWork.getBytes(Charset.forName("UTF-8")));
                    os.flush();
                    // Because of the INHERIT rule with the output stream
                    // the console stream overwrites itself. REMOVE THIS in production.
                    Thread.sleep(100);
                } catch (Exception ex) {
                    sLogger.log(Level.SEVERE, null, ex);
                } finally {
                    if (workProcess != null) {
                        try {
                            // Seriously.. so many exceptions.
                            mPool.returnObject(workProcess);
                        } catch (Exception ex) {
                            sLogger.log(Level.SEVERE, null, ex);
                        }
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            // Change the 5 to 20 in your case. 
            // Also change mock_php.exe to /usr/bin/php or wherever.
            ObjectPool<Process> pool =
                    new GenericObjectPool<>(
                    new CLIPoolableObjectFactory("mock_php.exe"), 5);         
    
            // This will only allow you to queue 100 work items at a time. I would suspect
            // that if you only want 20 PHP processes running at a time and this queue
            // filled up you'll need to implement some other strategy as you are doing
            // more work than PHP can keep up with. You'll need to block at some point
            // or throw work away.
            BlockingQueue<Runnable> queue = 
                new ArrayBlockingQueue<>(100, true);
    
            ThreadPoolExecutor executor = 
                new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
    
            // print some stuff out.
            executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
    
            executor.shutdown();
            executor.awaitTermination(4000, TimeUnit.HOURS);
    
            pool.close();        
        }
    }
    

    Результат выполнения программы:

    12172 - Message 2
    10568 - Message 1
    4804 - Message 3
    11916 - Message 4
    11116 - Message 5
    12172 - Message 6
    4804 - Message 7
    10568 - Message 8
    11916 - Message 9
    11116 - Message 10
    12172 - Message 11
    

    Код программы на C ++ для вывода того, что было введено:

    #include <windows.h>
    #include <iostream>
    #include <string>
    
    int main(int argc, char* argv[])
    {
        DWORD pid = GetCurrentProcessId();
        std::string line;
        while (true) {      
            std::getline (std::cin, line);
            std::cout << pid << " - " << line << std::endl;
        }
    
        return 0;
    }
    

    Update

    Извините за задержку. Вот версия JDK 6 для всех, кто заинтересован. Вам нужно будет запустить отдельный поток, чтобы прочитать все входные данные из InputStream процесса. Я установил этот код так, чтобы он порождал новый поток рядом с каждым новым процессом. Этот поток всегда читает из процесса, пока он жив. Вместо вывода непосредственно в файл я настроил его так, чтобы он использовал инфраструктуру ведения журнала. Таким образом, вы можете настроить конфигурацию журналирования, чтобы перейти к файлу, перевернуться, перейти к консоли и т. Д., Без жесткой настройки для перехода в файл.

    Вы заметите, что я запускаю только одного Gobbler для каждого процесса, даже если у процесса есть stdout и stderr. Я перенаправляю stderr в stdout, чтобы было проще. Очевидно, jdk6 поддерживает только этот тип перенаправления.

    public class StackOverflow_10037379_jdk6 {
    
        private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());
    
        // Shamelessy taken from Google and modified. 
        // I don't know who the original Author is.
        public static class StreamGobbler extends Thread {
    
            InputStream is;
            Logger logger;
            Level level;
    
            StreamGobbler(String logName, Level level, InputStream is) {
                this.is = is;
                this.logger = Logger.getLogger(logName);
                this.level = level;
            }
    
            public void run() {
                try {
                    InputStreamReader isr = new InputStreamReader(is);
                    BufferedReader br = new BufferedReader(isr);
                    String line = null;
                    while ((line = br.readLine()) != null) {
                        logger.log(level, line);
                    }
                } catch (IOException ex) {
                    logger.log(Level.SEVERE, "Failed to read from Process.", ex);
                }
                logger.log(
                        Level.INFO, 
                        String.format("Exiting Gobbler for %s.", logger.getName()));
            }
        }
    
        public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
    
            private String mProcessToRun;
    
            public CLIPoolableObjectFactory(String processToRun) {
                mProcessToRun = processToRun;
            }
    
            @Override
            public Process makeObject() throws Exception {
                ProcessBuilder builder = new ProcessBuilder();
                builder.redirectErrorStream(true);
                builder.command(mProcessToRun);
                Process process = builder.start();
                StreamGobbler loggingGobbler =
                        new StreamGobbler(
                        String.format("process.%s", process.hashCode()),
                        Level.INFO,
                        process.getInputStream());
                loggingGobbler.start();
                return process;
            }
    
            @Override
            public boolean validateObject(Process process) {
                try {
                    process.exitValue();
                    return false;
                } catch (IllegalThreadStateException ex) {
                    return true;
                }
            }
    
            @Override
            public void destroyObject(Process process) throws Exception {
                // If PHP has a way to stop it, do that instead of destroy
                process.destroy();
            }
    
            @Override
            public void passivateObject(Process process) throws Exception {
                // Should really try to read from the InputStream of the Process
                // to prevent lock-ups if Rediret.INHERIT is not used.
            }
        }
    
        public static class CLIWorkItem implements Runnable {
    
            private ObjectPool<Process> mPool;
            private String mWork;
    
            public CLIWorkItem(ObjectPool<Process> pool, String work) {
                mPool = pool;
                mWork = work;
            }
    
            @Override
            public void run() {
                Process workProcess = null;
                try {
                    workProcess = mPool.borrowObject();
                    OutputStream os = workProcess.getOutputStream();
                    os.write(mWork.getBytes(Charset.forName("UTF-8")));
                    os.flush();
                    // Because of the INHERIT rule with the output stream
                    // the console stream overwrites itself. REMOVE THIS in production.
                    Thread.sleep(100);
                } catch (Exception ex) {
                    sLogger.log(Level.SEVERE, null, ex);
                } finally {
                    if (workProcess != null) {
                        try {
                            // Seriously.. so many exceptions.
                            mPool.returnObject(workProcess);
                        } catch (Exception ex) {
                            sLogger.log(Level.SEVERE, null, ex);
                        }
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            // Change the 5 to 20 in your case. 
            ObjectPool<Process> pool =
                    new GenericObjectPool<Process>(
                    new CLIPoolableObjectFactory("mock_php.exe"), 5);
    
            BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);
    
            ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
    
            // print some stuff out.
            executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
            executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
    
            executor.shutdown();
            executor.awaitTermination(4000, TimeUnit.HOURS);
    
            pool.close();
        }
    }
    

    Output

    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 9440 - Message 3
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 8776 - Message 2
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 6100 - Message 1
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 10096 - Message 4
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 8868 - Message 5
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 8868 - Message 8
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 6100 - Message 10
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 8776 - Message 9
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 10096 - Message 6
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 9440 - Message 7
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: 6100 - Message 11
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: Exiting Gobbler for process.295131993.
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: Exiting Gobbler for process.756434719.
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: Exiting Gobbler for process.332711452.
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: Exiting Gobbler for process.1981440623.
    Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
    INFO: Exiting Gobbler for process.1043636732.