001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.eventbus;
018    
019    import com.google.common.annotations.Beta;
020    import java.util.concurrent.ConcurrentLinkedQueue;
021    import java.util.concurrent.Executor;
022    
023    /**
024     * An {@link EventBus} that takes the Executor of your choice and uses it to
025     * dispatch events, allowing dispatch to occur asynchronously.
026     *
027     * @author Cliff Biffle
028     * @since 10.0
029     */
030    @Beta
031    public class AsyncEventBus extends EventBus {
032      private final Executor executor;
033    
034      /** the queue of events is shared across all threads */
035      private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch =
036          new ConcurrentLinkedQueue<EventWithHandler>();
037    
038      /**
039       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
040       * events.  Assigns {@code identifier} as the bus's name for logging purposes.
041       *
042       * @param identifier short name for the bus, for logging purposes.
043       * @param executor   Executor to use to dispatch events. It is the caller's
044       *        responsibility to shut down the executor after the last event has
045       *        been posted to this event bus.
046       */
047      public AsyncEventBus(String identifier, Executor executor) {
048        super(identifier);
049        this.executor = executor;
050      }
051    
052      /**
053       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
054       * events.
055       *
056       * @param executor Executor to use to dispatch events. It is the caller's
057       *        responsibility to shut down the executor after the last event has
058       *        been posted to this event bus.
059       */
060      public AsyncEventBus(Executor executor) {
061        this.executor = executor;
062      }
063    
064      @Override
065      protected void enqueueEvent(Object event, EventHandler handler) {
066        eventsToDispatch.offer(new EventWithHandler(event, handler));
067      }
068    
069      /**
070       * Dispatch {@code events} in the order they were posted, regardless of
071       * the posting thread.
072       */
073      @Override
074      protected void dispatchQueuedEvents() {
075        while (true) {
076          EventWithHandler eventWithHandler = eventsToDispatch.poll();
077          if (eventWithHandler == null) {
078            break;
079          }
080    
081          dispatch(eventWithHandler.event, eventWithHandler.handler);
082        }
083      }
084    
085      /**
086       * Calls the {@link #executor} to dispatch {@code event} to {@code handler}.
087       */
088      @Override
089      protected void dispatch(final Object event, final EventHandler handler) {
090        executor.execute(new Runnable() {
091              @Override
092              @SuppressWarnings("synthetic-access")
093              public void run() {
094                AsyncEventBus.super.dispatch(event, handler);
095              }
096            });
097      }
098    
099    }