001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.eventbus;
018    
019    import com.google.common.annotations.Beta;
020    
021    import java.util.concurrent.ConcurrentLinkedQueue;
022    import java.util.concurrent.Executor;
023    
024    /**
025     * An {@link EventBus} that takes the Executor of your choice and uses it to
026     * dispatch events, allowing dispatch to occur asynchronously.
027     *
028     * @author Cliff Biffle
029     * @since 10.0
030     */
031    @Beta
032    public class AsyncEventBus extends EventBus {
033      private final Executor executor;
034    
035      /** the queue of events is shared across all threads */
036      private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch =
037          new ConcurrentLinkedQueue<EventWithHandler>();
038    
039      /**
040       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
041       * events.  Assigns {@code identifier} as the bus's name for logging purposes.
042       *
043       * @param identifier short name for the bus, for logging purposes.
044       * @param executor   Executor to use to dispatch events. It is the caller's
045       *        responsibility to shut down the executor after the last event has
046       *        been posted to this event bus.
047       */
048      public AsyncEventBus(String identifier, Executor executor) {
049        super(identifier);
050        this.executor = executor;
051      }
052    
053      /**
054       * Creates a new AsyncEventBus that will use {@code executor} to dispatch
055       * events.
056       *
057       * @param executor Executor to use to dispatch events. It is the caller's
058       *        responsibility to shut down the executor after the last event has
059       *        been posted to this event bus.
060       */
061      public AsyncEventBus(Executor executor) {
062        this.executor = executor;
063      }
064    
065      @Override
066      void enqueueEvent(Object event, EventHandler handler) {
067        eventsToDispatch.offer(new EventWithHandler(event, handler));
068      }
069    
070      /**
071       * Dispatch {@code events} in the order they were posted, regardless of
072       * the posting thread.
073       */
074      @SuppressWarnings("deprecation") // only deprecated for external subclasses
075      @Override
076      protected void dispatchQueuedEvents() {
077        while (true) {
078          EventWithHandler eventWithHandler = eventsToDispatch.poll();
079          if (eventWithHandler == null) {
080            break;
081          }
082    
083          dispatch(eventWithHandler.event, eventWithHandler.handler);
084        }
085      }
086    
087      /**
088       * Calls the {@link #executor} to dispatch {@code event} to {@code handler}.
089       */
090      @Override
091      void dispatch(final Object event, final EventHandler handler) {
092        executor.execute(new Runnable() {
093              @Override
094              @SuppressWarnings("synthetic-access")
095              public void run() {
096                AsyncEventBus.super.dispatch(event, handler);
097              }
098            });
099      }
100    
101    }