001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.eventbus;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020
021import com.google.common.annotations.Beta;
022
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.Executor;
025
026/**
027 * An {@link EventBus} that takes the Executor of your choice and uses it to
028 * dispatch events, allowing dispatch to occur asynchronously.
029 *
030 * @author Cliff Biffle
031 * @since 10.0
032 */
033@Beta
034public class AsyncEventBus extends EventBus {
035  private final Executor executor;
036
037  /** the queue of events is shared across all threads */
038  private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch =
039      new ConcurrentLinkedQueue<EventWithSubscriber>();
040
041  /**
042   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
043   * events.  Assigns {@code identifier} as the bus's name for logging purposes.
044   *
045   * @param identifier short name for the bus, for logging purposes.
046   * @param executor   Executor to use to dispatch events. It is the caller's
047   *        responsibility to shut down the executor after the last event has
048   *        been posted to this event bus.
049   */
050  public AsyncEventBus(String identifier, Executor executor) {
051    super(identifier);
052    this.executor = checkNotNull(executor);
053  }
054
055  /**
056   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
057   * events.
058   *
059   * @param executor Executor to use to dispatch events. It is the caller's
060   *        responsibility to shut down the executor after the last event has
061   *        been posted to this event bus.
062   * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
063   *    See {@link SubscriberExceptionHandler} for more information.
064   * @since 16.0
065   */
066  public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
067    super(subscriberExceptionHandler);
068    this.executor = checkNotNull(executor);
069  }
070
071  /**
072   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
073   * events.
074   *
075   * @param executor Executor to use to dispatch events. It is the caller's
076   *        responsibility to shut down the executor after the last event has
077   *        been posted to this event bus.
078   */
079  public AsyncEventBus(Executor executor) {
080    super("default");
081    this.executor = checkNotNull(executor);
082  }
083
084  @Override
085  void enqueueEvent(Object event, EventSubscriber subscriber) {
086    eventsToDispatch.offer(new EventWithSubscriber(event, subscriber));
087  }
088
089  /**
090   * Dispatch {@code events} in the order they were posted, regardless of
091   * the posting thread.
092   */
093  @SuppressWarnings("deprecation") // only deprecated for external subclasses
094  @Override
095  protected void dispatchQueuedEvents() {
096    while (true) {
097      EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll();
098      if (eventWithSubscriber == null) {
099        break;
100      }
101
102      dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber);
103    }
104  }
105
106  /**
107   * Calls the {@link #executor} to dispatch {@code event} to {@code subscriber}.
108   */
109  @Override
110  void dispatch(final Object event, final EventSubscriber subscriber) {
111    checkNotNull(event);
112    checkNotNull(subscriber);
113    executor.execute(
114        new Runnable() {
115          @Override
116          public void run() {
117            AsyncEventBus.super.dispatch(event, subscriber);
118          }
119        });
120  }
121}