001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.eventbus;
018    
019    import com.google.common.annotations.Beta;
020    
021    import java.util.concurrent.ConcurrentLinkedQueue;
022    import java.util.concurrent.Executor;
023    
024    /**
025     * An {@link EventBus} that takes the Executor of your choice and uses it to
026     * dispatch events, allowing dispatch to occur asynchronously.
027     *
028     * @author Cliff Biffle
029     * @since 10.0
030     */
031    @Beta
032    public class AsyncEventBus extends EventBus {
033      private final Executor executor;
034    
035      /** the queue of events is shared across all threads */
036      private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch =
037          new ConcurrentLinkedQueue<EventWithHandler>();
038    
039      /**
040       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
041       * events.  Assigns {@code identifier} as the bus's name for logging purposes.
042       *
043       * @param identifier short name for the bus, for logging purposes.
044       * @param executor   Executor to use to dispatch events. It is the caller's
045       *        responsibility to shut down the executor after the last event has
046       *        been posted to this event bus.
047       */
048      public AsyncEventBus(String identifier, Executor executor) {
049        super(identifier);
050        this.executor = executor;
051      }
052    
053      /**
054       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
055       * events.
056       *
057       * @param executor Executor to use to dispatch events. It is the caller's
058       *        responsibility to shut down the executor after the last event has
059       *        been posted to this event bus.
060       */
061      public AsyncEventBus(Executor executor) {
062        this.executor = executor;
063      }
064    
065      @Override
066      protected void enqueueEvent(Object event, EventHandler handler) {
067        eventsToDispatch.offer(new EventWithHandler(event, handler));
068      }
069    
070      /**
071       * Dispatch {@code events} in the order they were posted, regardless of
072       * the posting thread.
073       */
074      @Override
075      protected void dispatchQueuedEvents() {
076        while (true) {
077          EventWithHandler eventWithHandler = eventsToDispatch.poll();
078          if (eventWithHandler == null) {
079            break;
080          }
081    
082          dispatch(eventWithHandler.event, eventWithHandler.handler);
083        }
084      }
085    
086      /**
087       * Calls the {@link #executor} to dispatch {@code event} to {@code handler}.
088       */
089      @Override
090      protected void dispatch(final Object event, final EventHandler handler) {
091        executor.execute(new Runnable() {
092              @Override
093              @SuppressWarnings("synthetic-access")
094              public void run() {
095                AsyncEventBus.super.dispatch(event, handler);
096              }
097            });
098      }
099    
100    }