Вопрос по multithreading, java – Выполнение зависимых задач параллельно в Java

10

Мне нужно найти способ выполнять задачи (зависимые и независимые) параллельно в Java.

Task A and Task C can run independently. Task B is dependent on the output of Task A.

Я проверил java.util.concurrent Future и Fork / Join, но похоже, что мы не можем добавить зависимость к Задаче.

Может кто-нибудь указать мне, чтобы исправить Java API.

Рассматривали ли вы задачу A уведомить задачу B о завершении? Перед тем, как приступить к выполнению задачи A, создайте ее экземпляр и добавьте ее в качестве наблюдателя в задачу A (см. «Шаблон наблюдателя») David W
Гуавы & APOS; sListenableFuture немного дружелюбнее об этих вещах, чем простые фьючерсы. Louis Wasserman

Ваш Ответ

8   ответов
3

Поток данных, Упрощенная модель, где каждая задача имеет только одну, хотя и повторяющуюся зависимостьАктерская модель, Существует много библиотек акторов для Java, но очень мало для потока данных. Смотрите также:который актер-модель-библиотека-каркасный для Java-, Java-паттерн-для-вложенных обратных вызовов

10

и я думаю, что лучше использовать Scala. Вот пример, который я вытащил отсюдаhttp://danielwestheide.com/ (Руководство Неофита по Scala, часть 16: Куда пойти отсюда) У этого парня отличный блог (я не тот парень)

Давай возьмем барристу заваривать кофе. Задачи, которые нужно сделать:

Grind the required coffee beans (no preceding tasks) Heat some water (no preceding tasks) Brew an espresso using the ground coffee and the heated water (depends on 1 & 2) Froth some milk (no preceding tasks) Combine the froth milk and the espresso (depends on 3,4)

или как дерево:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

В Java с использованием API параллелизма это будет:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am [email protected]#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

Удостоверьтесь, что вы добавляете тайм-ауты, чтобы гарантировать, что ваш код не будет ждать чего-то вечно, что делается с помощью Future.get (long, TimeUnit), а затем обрабатывать ошибки соответствующим образом.

Однако в scala это намного приятнее, вот как в блоге: Код для приготовления кофе будет выглядеть примерно так:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

где все методы возвращают будущее (типизированное будущее), например, grind будет выглядеть примерно так:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

За всеми реализациями обращайтесь к блогу, но это все, что нужно сделать. Вы можете легко интегрировать Scala и Java. Я действительно рекомендую делать такие вещи в Scala вместо Java. Scala требует гораздо меньше кода, намного чище и ориентирован на события.

0

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

Это то, что вы ищете. Это помогает строить потоки выполнения.

0

я бы сначала задал вопрос, действительно ли задача B является отдельной задачей. Разделение задач будет иметь смысл, если есть:

Some non-trivial amount of work that task B can do before needing task A's results Task B is a long ongoing process that handles output from many different instances of task A There is some other tasks (say D) that also use task A's results

Предполагая, что это отдельная задача, вы можете разрешить задачу A & amp; B, чтобы поделитьсяBlockingQueue так что задача A может передавать данные задачи B.

0

https://github.com/familysyan/TaskOrchestration, Он управляет зависимостью задачи для вас.

0

CountDownLatch.

final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

// thread c
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

new Thread() {
    public void run() {
        try {
            gate.await();
            // both thread a and thread c have completed
            // process thread b
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}.start();

В качестве альтернативы, в зависимости от вашего сценария, вы также можете использоватьBlockingQueue реализовать шаблон Производитель-Потребитель. Смотрите пример на странице документации.

CountDownLatch здесь избыточно, и, согласно ОП, задача B зависит только от задачи A, а не от задачи A & amp; C. Тем не менее, -1 просто не обрабатываетInterruptedException правильно во фрагменте.
Спасибо, идея фрагмента кода состояла в том, чтобы показать ему, как работает CountDownLatch, а не показать, как правильно обрабатывать исключения.
1

ь, а задача B блокирует, пока что-то не станет доступным в очереди.

Документы содержат пример кода для достижения этой цели:http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

0

и: я владелец этой библиотеки), которая называетсяDexecutor

Вот как вы можете достичь желаемого результата, вы можете прочитать больше об этомВот

@Test
public void testDependentTaskExecution() {

    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("A", "B");
    executor.addIndependent("C");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

}

private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
}

private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}

private static class SleepyTaskProvider implements TaskProvider<String, String> {

    public Task<String, String> provid(final String id) {

        return new Task<String, String>() {

            @Override
            public String execute() {
                try {
                    //Perform some task
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String result = id + "processed";
                return result;
            }

            @Override
            public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
                ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
                //Do some logic with parent result
                if ("B".equals(id) && firstParentResult.isSkipped()) {
                    return false;
                }
                return true;
            }
        };          
    }

}

Похожие вопросы