Вопрос по java, stdin, php – ThreadPool процессов CLI

5

Мне нужно передавать сообщения в CLI PHP-процессы через стандартный ввод из Java. Мне бы хотелось, чтобы в пуле работало около 20 процессов PHP, так что когда я передаю сообщение в пул, оно отправляет каждое сообщение в отдельный поток, сохраняя очередь сообщений для доставки. Мне бы хотелось, чтобы эти процессы PHP оставались в живых как можно дольше, создавая новый, если он умирает. Я смотрел на это с помощью статического пула потоков, но он, кажется, больше предназначен для задач, которые выполняются и просто умирают. Как я мог сделать это с помощью простого интерфейса для передачи сообщения в пул? Должен ли я реализовать свой собственный «пул потоков»?

Очень похоже на этот вопрос:stackoverflow.com/questions/2592093/php-thread-pool David-SkyMesh
У меня есть какой-либо вывод из PHP такой, что вы знаете, когда это будет сделано обработки? Clint
Это никогда не будет сделано обработка. Если кто-то умирает, мне нужно создать новый, чтобы заменить его. Я буду передавать им данные в виде циклического перебора через stdin. Will

Ваш Ответ

2   ответа
4

поскольку я думаю, что это прояснит ситуацию. По сути, вам нужно хранить пул объектов процесса. Учтите, что у каждого из этих процессов есть поток ввода, вывода и потока ошибок, которыми вы должны каким-то образом управлять. В моем примере я просто перенаправляю ошибку и вывод на консоль основных процессов. Вы можете настроить обратные вызовы и обработчики для получения вывода программы PHP при необходимости. Если вы просто обрабатываете задачи и не заботитесь о том, что говорит PHP, оставьте все как есть или перенаправьте в файл.

Я используюApache Commons Pool библиотека для ObjectPool. Не нужно заново изобретать.

У вас будет пул из 20 процессов, которые запускают вашу PHP-программу. Одно это не даст вам того, что вам нужно. Возможно, вы захотите обрабатывать задачи для всех этих 20 процессов одновременно. Поэтому вам также понадобится ThreadPool, который будет извлекать Process из вашего ObjectPool.

Вы также должны понимать, что если вы убьете или CTRL-C, ваш Java обработаетinit процесс возьмет на себя ваши процессы PHP, и они просто будут сидеть там. Вы, вероятно, захотите сохранить журнал всех pid-процессов PHP-процессов, которые вы породили, а затем очистить их, если вы перезапустите Java-программу.

public class StackOverflow_10037379 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectError(Redirect.INHERIT);
            // I am being lazy, but really the InputStream is where
            // you can get any output of the PHP Process. This setting
            // will make it output to the current processes console.
            builder.redirectOutput(Redirect.INHERIT);
            builder.redirectInput(Redirect.PIPE);
            builder.command(mProcessToRun);
            return builder.start();
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        // Also change mock_php.exe to /usr/bin/php or wherever.
        ObjectPool<Process> pool =
                new GenericObjectPool<>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);         

        // This will only allow you to queue 100 work items at a time. I would suspect
        // that if you only want 20 PHP processes running at a time and this queue
        // filled up you'll need to implement some other strategy as you are doing
        // more work than PHP can keep up with. You'll need to block at some point
        // or throw work away.
        BlockingQueue<Runnable> queue = 
            new ArrayBlockingQueue<>(100, true);

        ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();        
    }
}

Результат выполнения программы:

12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11

Код программы на C ++ для вывода того, что было введено:

#include <windows.h>
#include <iostream>
#include <string>

int main(int argc, char* argv[])
{
    DWORD pid = GetCurrentProcessId();
    std::string line;
    while (true) {      
        std::getline (std::cin, line);
        std::cout << pid << " - " << line << std::endl;
    }

    return 0;
}

Update

Извините за задержку. Вот версия JDK 6 для всех, кто заинтересован. Вам нужно будет запустить отдельный поток, чтобы прочитать все входные данные из InputStream процесса. Я установил этот код так, чтобы он порождал новый поток рядом с каждым новым процессом. Этот поток всегда читает из процесса, пока он жив. Вместо вывода непосредственно в файл я настроил его так, чтобы он использовал инфраструктуру ведения журнала. Таким образом, вы можете настроить конфигурацию журналирования, чтобы перейти к файлу, перевернуться, перейти к консоли и т. Д., Без жесткой настройки для перехода в файл.

Вы заметите, что я запускаю только одного Gobbler для каждого процесса, даже если у процесса есть stdout и stderr. Я перенаправляю stderr в stdout, чтобы было проще. Очевидно, jdk6 поддерживает только этот тип перенаправления.

public class StackOverflow_10037379_jdk6 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is.
    public static class StreamGobbler extends Thread {

        InputStream is;
        Logger logger;
        Level level;

        StreamGobbler(String logName, Level level, InputStream is) {
            this.is = is;
            this.logger = Logger.getLogger(logName);
            this.level = level;
        }

        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(is);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                while ((line = br.readLine()) != null) {
                    logger.log(level, line);
                }
            } catch (IOException ex) {
                logger.log(Level.SEVERE, "Failed to read from Process.", ex);
            }
            logger.log(
                    Level.INFO, 
                    String.format("Exiting Gobbler for %s.", logger.getName()));
        }
    }

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectErrorStream(true);
            builder.command(mProcessToRun);
            Process process = builder.start();
            StreamGobbler loggingGobbler =
                    new StreamGobbler(
                    String.format("process.%s", process.hashCode()),
                    Level.INFO,
                    process.getInputStream());
            loggingGobbler.start();
            return process;
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        ObjectPool<Process> pool =
                new GenericObjectPool<Process>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();
    }
}

Output

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.
Error: User Rate Limit Exceeded Will
Error: User Rate Limit Exceeded Will
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded
Error: User Rate Limit Exceeded Will
1

но связь между процессами затруднена. Я бы порекомендовал создать очередь, из которой ваши процессы могут читать, вместо того, чтобы пытаться передавать сообщения в командную строку.

Beanstalk имеет несколько клиентов PHP, которые вы можете использовать для обмена сообщениями между процессами.

Error: User Rate Limit Exceeded Will

Похожие вопросы