001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.eventbus;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020
021import com.google.common.annotations.Beta;
022
023import java.util.concurrent.ConcurrentLinkedQueue;
024import java.util.concurrent.Executor;
025
026/**
027 * An {@link EventBus} that takes the Executor of your choice and uses it to
028 * dispatch events, allowing dispatch to occur asynchronously.
029 *
030 * @author Cliff Biffle
031 * @since 10.0
032 */
033@Beta
034public class AsyncEventBus extends EventBus {
035  private final Executor executor;
036
037  /** the queue of events is shared across all threads */
038  private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch =
039      new ConcurrentLinkedQueue<EventWithHandler>();
040
041  /**
042   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
043   * events.  Assigns {@code identifier} as the bus's name for logging purposes.
044   *
045   * @param identifier short name for the bus, for logging purposes.
046   * @param executor   Executor to use to dispatch events. It is the caller's
047   *        responsibility to shut down the executor after the last event has
048   *        been posted to this event bus.
049   */
050  public AsyncEventBus(String identifier, Executor executor) {
051    super(identifier);
052    this.executor = checkNotNull(executor);
053  }
054
055  /**
056   * Creates a new AsyncEventBus that will use {@code executor} to dispatch
057   * events.
058   *
059   * @param executor Executor to use to dispatch events. It is the caller's
060   *        responsibility to shut down the executor after the last event has
061   *        been posted to this event bus.
062   */
063  public AsyncEventBus(Executor executor) {
064    this.executor = checkNotNull(executor);
065  }
066
067  @Override
068  void enqueueEvent(Object event, EventHandler handler) {
069    eventsToDispatch.offer(new EventWithHandler(event, handler));
070  }
071
072  /**
073   * Dispatch {@code events} in the order they were posted, regardless of
074   * the posting thread.
075   */
076  @SuppressWarnings("deprecation") // only deprecated for external subclasses
077  @Override
078  protected void dispatchQueuedEvents() {
079    while (true) {
080      EventWithHandler eventWithHandler = eventsToDispatch.poll();
081      if (eventWithHandler == null) {
082        break;
083      }
084
085      dispatch(eventWithHandler.event, eventWithHandler.handler);
086    }
087  }
088
089  /**
090   * Calls the {@link #executor} to dispatch {@code event} to {@code handler}.
091   */
092  @Override
093  void dispatch(final Object event, final EventHandler handler) {
094    checkNotNull(event);
095    checkNotNull(handler);
096    executor.execute(
097        new Runnable() {
098          @Override
099          public void run() {
100            AsyncEventBus.super.dispatch(event, handler);
101          }
102        });
103  }
104}