Вопрос по java, guava – Гуава EventBus диспетчеризация

12

Я использую EventBus Гуавы, чтобы начать некоторую обработку и сообщить результаты. Вот очень простой компилируемый пример:

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

public class Test {

    public static class InitiateProcessing { }
    public static class ProcessingStarted { }
    public static class ProcessingResults { }
    public static class ProcessingFinished { }

    public static EventBus bus = new EventBus();

    @Subscribe
    public void receiveStartRequest(InitiateProcessing evt) {
        System.out.println("Got processing request - starting processing");
        bus.post(new ProcessingStarted());

        System.out.println("Generating results");
        bus.post(new ProcessingResults());
        System.out.println("Generating more results");
        bus.post(new ProcessingResults());

        bus.post(new ProcessingFinished());
    }

    @Subscribe
    public void processingStarted(ProcessingStarted evt) {
        System.out.println("Processing has started");
    }

    @Subscribe
    public void resultsReceived(ProcessingResults evt) {
        System.out.println("got results");
    }

    @Subscribe
    public void processingComplete(ProcessingFinished evt) {
        System.out.println("Processing has completed");
    }


    public static void main(String[] args) {
        Test t = new Test();
        bus.register(t);
        bus.post(new InitiateProcessing());
    }
}

Я использую эти события как способ для других программных компонентов реагировать при подготовке к этой обработке. Например, им может потребоваться сохранить свое текущее состояние перед обработкой и восстановить его после.

Я ожидаю, что результат этой программы будет:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed

Вместо этого фактический результат:

Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed

Событие, которое должно указывать на то, что обработка началась, фактически происходит после фактической обработки («генерация результатов»).

Посмотрев на исходный код, я понимаю, почему он так себя ведет. Вот соответствующийисходный код дляEventBus.

  /**
   * Drain the queue of events to be dispatched. As the queue is being drained,
   * new events may be posted to the end of the queue.
   */
  void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy
    // and out-of-order events. Instead, leave the events to be dispatched
    // after the in-progress dispatch is complete.
    if (isDispatching.get()) {
        return;
    }
    // dispatch event (omitted)

Что происходит, так как я уже отправляю верхний уровеньInitiateProcessing событие, остальные события просто выталкиваются в конец очереди. Я хотел бы, чтобы это велось подобно событиям .NET, когда вызов события не возвращается, пока не завершатся все обработчики.

Я не совсем понимаю причину этой реализации. Конечно, события гарантированно будут в порядке, но порядок окружающего кода будет полностью искажен.

Есть ли способ заставить шину вести себя так, как описано, и дать желаемый результат? Я читал в Javadocs, что

EventBus гарантирует, что он не будет вызывать метод подписчика из нескольких потоков одновременно, если только метод явно не разрешает это с помощью аннотации @AllowConcurrentEvents.

Но я не думаю, что это применимо здесь - я вижу эту проблему в однопоточном приложении.

редактировать

Причина проблемы в том, что яpostвнутри абонента. Поскольку шина событий не реентерабельна, эти «подпосты» ставятся в очередь и обрабатываются после завершения первого обработчика. Я могу закомментироватьif (isDispatching.get()) { return; } раздел вEventBus источник и все ведет себя так, как я и ожидал - так что реальный вопрос в том, какие потенциальные проблемы я создал при этом? Кажется, дизайнеры приняли добросовестное решение не допустить повторного входа.

Возможно, ты прав. Хотя я думаю, они работают в новой ветке :) Можете ли вы проверить это, добавивSystem.out.println( "curr thread: " + Thread.currentThread().getName() ) к каждой вашей обработке@Subscribeметоды? injecteer
По общему признанию, это немного странно, что точная синхронность публикации событий зависит от того, был ли вызван метод публикации событий при обработке другого события или нет, но, как я сказал в своем ответе, код, который публикует события, не должен касаться с тем, когда или как они обрабатываются. ColinD
@injecteer Он не запускает собственную ветку. У них естьAsyncEventBus что позволяет вам указатьExecutor - но я этим не пользуюсь. Это все однопоточное. zmb
Почему вы .post () изв подписчик для начала? Здесь что-то подозрительно fge

Ваш Ответ

4   ответа
2

В то время как публикация в EventBus не возвращается, пока все "подписчики" не будут сигнализированы .. эти подписчики, возможно, НЕ начали выполнение. Это означает, что когда возвращается первый bus.post, вы продолжаете следующий пост без вмешательства подписчика.

public void post (Событие объекта) Отправляет событие всем зарегистрированным подписчикам. Этот метод будет успешно возвращен после публикации события всем подписчикам и независимо от каких-либо исключений, выданных подписчиками. Если ни один подписчик не подписан на класс события, и событие уже не является DeadEvent, оно будет помещено в DeadEvent и помещено в репозиторий.

Параметры: событие - событие для публикации.

Это не совсем верно - в этом случае, когда первое сообщение возвращает все события были обработаны. В любом случае - это не отвечает на мой вопрос о том, могу ли я заставить автобус вести себя так, как ожидалось. zmb
7

EventBus обычно работает по принципу, что код, отправляющий событие на шину, не должен заботиться о том, что подписчики делают с событиями или когда, кроме того, соблюдается порядок, в котором были опубликованы события (в случае синхронной шины событий тем не мение).

Если вы хотите, чтобы определенные методы вызывались в определенное время в ходе выполнения вашего метода, и вы хотите убедиться, что эти методы завершены до того, как ваш метод продолжится (как вы, похоже, в своем примере), почему бы не вызвать эти методы напрямую? Когда вы используете шину событий, вы явно отделяете свой код от того, что именно происходит в ответ на данное событие. Это желательно во многих случаях и является основной причинойEventBus существует, но это, кажется, не совсем то, что вы хотите здесь.

«Отделение моего кода от того, что именно происходит в ответ на данное событие»является что я хочу. Я хочу, чтобы на мероприятии говорилось «будьте готовы к тому, что что-то случится» - мне все равно, как или что должны сделать подписчики, чтобы подготовиться. Им, возможно, не придется ничего делать. Я просто хочу быть уверен, что после публикации события подписчики готовы, чтобы я мог продолжить некоторые действия. Шина событий кажется идеальной для этого случая, потому что я могу легко добавлять и удалять компоненты, которые могут обрабатывать событие по-разному. Код размещения события не имеет значения. zmb
Что я извлекаю из ответов, так это то, что такое поведение разработано специально, и шина событий как таковая не будет поддерживать мой вариант использования. Я закончил тем, что изменил шину, чтобы позволить повторный вход, который лучше подходит моему сценарию использования. zmb
5

Я пытаюсь обобщить поведение доставки событий EventBus в Guava:

Если событиеE1 опубликовано в данный моментt1Все подписчики уведомляются. Если один из подписчиков публикует событие в@Подписывайся-метод (маленький момент спустя), «новое» событиеE2 поставлен в очередь и доставленвпоследствии, Потом значит здесь: ведь@Подписывайся-методы дляE1 отt1 действительно вернулся.

Сравните этот вид «каскадных» сообщений о событиях с шириной первого обхода дерева.

Кажется, это явно выбранный дизайн EventBus.

0

Я знаю, что этому вопросу 4 года, но сегодня я столкнулся с той же проблемой. Есть простое (и нелогичное) изменение, чтобы получить желаемое поведение. вhttps://stackoverflow.com/a/53136251/1296767, вы можете использовать AsyncEventBus с DirectExecutor:

public static EventBus bus = new AsyncEventBus(MoreExecutors.newDirectExecutorService());

Запустив тестовый код с указанным выше изменением, вы получите именно то, чего хотите:

Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed
Neato, спасибо, что поделился :-) zmb

Похожие вопросы