Вопрос по multithreading, threadpool, java – Как реализовать ExecutorService для выполнения задач на основе ротации?

6

Я используюjava.util.concurrent.ExecutorService сфиксированный пул потоков выполнить список задач. Мой список задач обычно составляет около 80-150, и я ограничил количество потоков, работающих в любое время, до 10, как показано ниже:

<code>ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}
</code>

Мой вариант использования требует, чтобы даже выполненное задание было повторно отправлено вExecutorService но это должно быть выполнено / принято снова только тогда, когда всеalready представленные задания обслуживаются / завершаются. То есть, в основном, представленные задачи должны выполняться на основе ротации. Следовательно, не будет ниthreadPoolService.shutdown() или жеthreadPoolService.shutdownNow() позвони в этом случае.

У меня вопрос, как мне реализоватьExecutorService обслуживание задач на основе ротации?

Ваш Ответ

4   ответа
1

что все задачи выполнены, и повторно отправить их, как только это произойдет, например, вот так:

    List<Future> futures = new ArrayList<>();
    for (Runnable task : myTasks) {
        futures.add(threadPoolService.submit(task));
    }
    //wait until completion of all tasks
    for (Future f : futures) {
        f.get();
    }
    //restart
    ......

EDIT
Кажется, вы хотите повторно отправить задачу, как только она будет выполнена. Вы могли бы использоватьExecutorCompletionService который позволяет получать задачи по мере их выполнения, - см. ниже простой пример с 2 задачами, которые повторно отправляются несколько раз, как только они завершены. Пример вывода:

Task 1 submitted pool-1-thread-1
Task 2 submitted pool-1-thread-2
Task 1 completed pool-1-thread-1
Task 1 submitted pool-1-thread-3
Task 2 completed pool-1-thread-2
Task 1 completed pool-1-thread-3
Task 2 submitted pool-1-thread-4
Task 1 submitted pool-1-thread-5
Task 1 completed pool-1-thread-5
Task 2 completed pool-1-thread-4

public class Test1 {

    public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    public final AtomicInteger retries = new AtomicInteger();
    public final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count = 0;
        List<Runnable> myTasks = new ArrayList<>();
        myTasks.add(getRunnable(1));
        myTasks.add(getRunnable(2));
        ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
        CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
        for (Runnable task : myTasks) {
            ecs.submit(task, task);
        }
        //wait until completion of all tasks
        while(count++ < 3) {
            Runnable task = ecs.take().get();
            ecs.submit(task, task);
        }
        threadPoolService.shutdown();
    }

    private static Runnable getRunnable(final int i) {
        return new Runnable() {

            @Override
            public void run() {
                System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + "  ");
                try {
                    Thread.sleep(500 * i);
                } catch (InterruptedException ex) {
                    System.out.println("Interrupted");
                }
                System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + "  ");
            }
        };
    }
}
Здорово! Это то, что я ищу. Спасибо.
Смотрите мои правки - я думаю, это то, что вы имели в виду.
В моем случае мне нужно повторно подавать каждую отдельную задачу по мере ее завершения, но неwait until completion of all tasks, Есть идеи / комментарии? Gnanam
1

work queue используется для примераExecutorService, Итак, я предлагаю:

First choose an implementation of java.util.concurrent.BlockingQueue (an example) that provides a circular queue functionality. NOTE, the reason BlockingQueue has been chosen is that to wait until the next task is provided to queue; so, in case of circular + blocking queue, you should be careful how to provide the same behavior and functionality.

Instead of using Executors.new... to create a new ThreadPoolExecutor use a direct constructor such as

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

Таким образом, если вы не приказываете исполнителюshutdownбудет пытаться получить следующую задачу из очереди для выполнения из своейwork queue который являетсяcircular Контейнер для задач.

10

где вы можете поместить задание обратно в конец очереди.

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

Вам, конечно, придется проделать немного больше работы, чтобы создать его самостоятельно без помощиExecutorServiceудобный способ фабрики, но конструкторы достаточно просты, чтобы работать.

Спасибо за это предложение и дало мне знать о существованииThreadPoolExecutor учебный класс. У меня есть вопрос, что / как / почему мне нужно сдатьBlockingQueue<Runnable> workQueue конструктору. В случаеExecutorServiceЯ использовал, чтобы представить все мои задачи, как этоthreadPoolService.submit(task), Не могу понять об этомBlockingQueue<Runnable> workQueue, Надеюсь, ты сможешь заставить меня понять это. Gnanam
Я не уверен, что этот метод работает, после того, как Execute вызывается с FutureTask (Runnable - Wrapper вокруг фактической задачи), который имеет внутреннее состояние, которое отслеживает состояние завершения. Если он передан, он просто возвращается без выполнения фактической задачи.
Просто прочитайте оBlockingQueue<Runnable> и думал обновить его здесь.BlockingQueue в основном используется для хранения работы / задачи, которая отправляется исполнителю, когда все потоки в пуле заняты выполнением задач. Gnanam
1

которое полностью использует функциональность, существующую в стандартных утилитах параллелизма библиотеки. Он используетCyclicBarrier с классом декоратора задач и барьерным действием, которое повторно передает все задачи:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Rotation {

    private static final class RotationDecorator implements Runnable {
        private final Runnable          task;
        private final CyclicBarrier barrier;


        RotationDecorator( Runnable task, CyclicBarrier barrier ) {
            this.task = task;
            this.barrier = barrier;
        }


        @Override
        public void run() {
            this.task.run();
            try {
                this.barrier.await();
            } catch(InterruptedException e) {
                ; // Consider better exception handling
            } catch(BrokenBarrierException e) {
                ; // Consider better exception handling
            }
        }
    }


    public void startRotation( List<Runnable> tasks ) {
        final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
        final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
        final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
            @Override
            public void run() {
                Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
            }
        } );
        for(Runnable task : tasks) {
            rotatingTasks.add( new RotationDecorator( task, barrier ) );
        }
        this.enqueueTasks( threadPoolService, rotatingTasks );
    }


    private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
        for(Runnable task : tasks) {
            service.submit( task );
        }
    }

}

Похожие вопросы