001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.eventbus;
018    
019    import com.google.common.annotations.Beta;
020    import com.google.common.annotations.VisibleForTesting;
021    import com.google.common.base.Supplier;
022    import com.google.common.base.Throwables;
023    import com.google.common.cache.CacheBuilder;
024    import com.google.common.cache.CacheLoader;
025    import com.google.common.cache.LoadingCache;
026    import com.google.common.collect.Lists;
027    import com.google.common.collect.Multimap;
028    import com.google.common.collect.Multimaps;
029    import com.google.common.collect.SetMultimap;
030    import com.google.common.collect.Sets;
031    
032    import java.lang.reflect.InvocationTargetException;
033    import java.util.Collection;
034    import java.util.List;
035    import java.util.Map.Entry;
036    import java.util.Set;
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.ConcurrentLinkedQueue;
039    import java.util.concurrent.CopyOnWriteArraySet;
040    import java.util.concurrent.ExecutionException;
041    import java.util.logging.Level;
042    import java.util.logging.Logger;
043    
044    /**
045     * Dispatches events to listeners, and provides ways for listeners to register
046     * themselves.
047     *
048     * <p>The EventBus allows publish-subscribe-style communication between
049     * components without requiring the components to explicitly register with one
050     * another (and thus be aware of each other).  It is designed exclusively to
051     * replace traditional Java in-process event distribution using explicit
052     * registration. It is <em>not</em> a general-purpose publish-subscribe system,
053     * nor is it intended for interprocess communication.
054     *
055     * <h2>Receiving Events</h2>
056     * To receive events, an object should:<ol>
057     * <li>Expose a public method, known as the <i>event handler</i>, which accepts
058     *     a single argument of the type of event desired;</li>
059     * <li>Mark it with a {@link Subscribe} annotation;</li>
060     * <li>Pass itself to an EventBus instance's {@link #register(Object)} method.
061     *     </li>
062     * </ol>
063     *
064     * <h2>Posting Events</h2>
065     * To post an event, simply provide the event object to the
066     * {@link #post(Object)} method.  The EventBus instance will determine the type
067     * of event and route it to all registered listeners.
068     *
069     * <p>Events are routed based on their type &mdash; an event will be delivered
070     * to any handler for any type to which the event is <em>assignable.</em>  This
071     * includes implemented interfaces, all superclasses, and all interfaces
072     * implemented by superclasses.
073     *
074     * <p>When {@code post} is called, all registered handlers for an event are run
075     * in sequence, so handlers should be reasonably quick.  If an event may trigger
076     * an extended process (such as a database load), spawn a thread or queue it for
077     * later.  (For a convenient way to do this, use an {@link AsyncEventBus}.)
078     *
079     * <h2>Handler Methods</h2>
080     * Event handler methods must accept only one argument: the event.
081     *
082     * <p>Handlers should not, in general, throw.  If they do, the EventBus will
083     * catch and log the exception.  This is rarely the right solution for error
084     * handling and should not be relied upon; it is intended solely to help find
085     * problems during development.
086     *
087     * <p>The EventBus guarantees that it will not call a handler method from
088     * multiple threads simultaneously, unless the method explicitly allows it by
089     * bearing the {@link AllowConcurrentEvents} annotation.  If this annotation is
090     * not present, handler methods need not worry about being reentrant, unless
091     * also called from outside the EventBus.
092     *
093     * <h2>Dead Events</h2>
094     * If an event is posted, but no registered handlers can accept it, it is
095     * considered "dead."  To give the system a second chance to handle dead events,
096     * they are wrapped in an instance of {@link DeadEvent} and reposted.
097     *
098     * <p>If a handler for a supertype of all events (such as Object) is registered,
099     * no event will ever be considered dead, and no DeadEvents will be generated.
100     * Accordingly, while DeadEvent extends {@link Object}, a handler registered to
101     * receive any Object will never receive a DeadEvent.
102     *
103     * <p>This class is safe for concurrent use.
104     *
105     * @author Cliff Biffle
106     * @since 10.0
107     */
108    @Beta
109    public class EventBus {
110    
111      /**
112       * All registered event handlers, indexed by event type.
113       */
114      private final SetMultimap<Class<?>, EventHandler> handlersByType =
115          Multimaps.newSetMultimap(new ConcurrentHashMap<Class<?>, Collection<EventHandler>>(),
116              new Supplier<Set<EventHandler>>() {
117                @Override
118                public Set<EventHandler> get() {
119                  return new CopyOnWriteArraySet<EventHandler>();
120                }
121              });
122    
123      /**
124       * Logger for event dispatch failures.  Named by the fully-qualified name of
125       * this class, followed by the identifier provided at construction.
126       */
127      private final Logger logger;
128    
129      /**
130       * Strategy for finding handler methods in registered objects.  Currently,
131       * only the {@link AnnotatedHandlerFinder} is supported, but this is
132       * encapsulated for future expansion.
133       */
134      private final HandlerFindingStrategy finder = new AnnotatedHandlerFinder();
135    
136      /** queues of events for the current thread to dispatch */
137      private final ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>>
138          eventsToDispatch =
139          new ThreadLocal<ConcurrentLinkedQueue<EventWithHandler>>() {
140        @Override protected ConcurrentLinkedQueue<EventWithHandler> initialValue() {
141          return new ConcurrentLinkedQueue<EventWithHandler>();
142        }
143      };
144    
145      /** true if the current thread is currently dispatching an event */
146      private final ThreadLocal<Boolean> isDispatching =
147          new ThreadLocal<Boolean>() {
148        @Override protected Boolean initialValue() {
149          return false;
150        }
151      };
152    
153      /**
154       * A thread-safe cache for flattenHierarch(). The Class class is immutable.
155       */
156      private LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache =
157          CacheBuilder.newBuilder()
158              .weakKeys()
159              .build(new CacheLoader<Class<?>, Set<Class<?>>>() {
160                @Override
161                public Set<Class<?>> load(Class<?> concreteClass) throws Exception {
162                  List<Class<?>> parents = Lists.newLinkedList();
163                  Set<Class<?>> classes = Sets.newHashSet();
164    
165                  parents.add(concreteClass);
166    
167                  while (!parents.isEmpty()) {
168                    Class<?> clazz = parents.remove(0);
169                    classes.add(clazz);
170    
171                    Class<?> parent = clazz.getSuperclass();
172                    if (parent != null) {
173                      parents.add(parent);
174                    }
175    
176                    for (Class<?> iface : clazz.getInterfaces()) {
177                      parents.add(iface);
178                    }
179                  }
180    
181                  return classes;
182                }
183              });
184    
185      /**
186       * Creates a new EventBus named "default".
187       */
188      public EventBus() {
189        this("default");
190      }
191    
192      /**
193       * Creates a new EventBus with the given {@code identifier}.
194       *
195       * @param identifier  a brief name for this bus, for logging purposes.  Should
196       *                    be a valid Java identifier.
197       */
198      public EventBus(String identifier) {
199        logger = Logger.getLogger(EventBus.class.getName() + "." + identifier);
200      }
201    
202      /**
203       * Registers all handler methods on {@code object} to receive events.
204       * Handler methods are selected and classified using this EventBus's
205       * {@link HandlerFindingStrategy}; the default strategy is the
206       * {@link AnnotatedHandlerFinder}.
207       *
208       * @param object  object whose handler methods should be registered.
209       */
210      public void register(Object object) {
211        handlersByType.putAll(finder.findAllHandlers(object));
212      }
213    
214      /**
215       * Unregisters all handler methods on a registered {@code object}.
216       *
217       * @param object  object whose handler methods should be unregistered.
218       * @throws IllegalArgumentException if the object was not previously registered.
219       */
220      public void unregister(Object object) {
221        Multimap<Class<?>, EventHandler> methodsInListener = finder.findAllHandlers(object);
222        for (Entry<Class<?>, Collection<EventHandler>> entry : methodsInListener.asMap().entrySet()) {
223          Set<EventHandler> currentHandlers = getHandlersForEventType(entry.getKey());
224          Collection<EventHandler> eventMethodsInListener = entry.getValue();
225          
226          if (currentHandlers == null || !currentHandlers.containsAll(entry.getValue())) {
227            throw new IllegalArgumentException(
228                "missing event handler for an annotated method. Is " + object + " registered?");
229          }
230          currentHandlers.removeAll(eventMethodsInListener);
231        }
232      }
233    
234      /**
235       * Posts an event to all registered handlers.  This method will return
236       * successfully after the event has been posted to all handlers, and
237       * regardless of any exceptions thrown by handlers.
238       *
239       * <p>If no handlers have been subscribed for {@code event}'s class, and
240       * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
241       * DeadEvent and reposted.
242       *
243       * @param event  event to post.
244       */
245      public void post(Object event) {
246        Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());
247    
248        boolean dispatched = false;
249        for (Class<?> eventType : dispatchTypes) {
250          Set<EventHandler> wrappers = getHandlersForEventType(eventType);
251    
252          if (wrappers != null && !wrappers.isEmpty()) {
253            dispatched = true;
254            for (EventHandler wrapper : wrappers) {
255              enqueueEvent(event, wrapper);
256            }
257          }
258        }
259    
260        if (!dispatched && !(event instanceof DeadEvent)) {
261          post(new DeadEvent(this, event));
262        }
263    
264        dispatchQueuedEvents();
265      }
266    
267      /**
268       * Queue the {@code event} for dispatch during
269       * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
270       * so they can be dispatched in the same order.
271       */
272      protected void enqueueEvent(Object event, EventHandler handler) {
273        eventsToDispatch.get().offer(new EventWithHandler(event, handler));
274      }
275    
276      /**
277       * Drain the queue of events to be dispatched. As the queue is being drained,
278       * new events may be posted to the end of the queue.
279       */
280      protected void dispatchQueuedEvents() {
281        // don't dispatch if we're already dispatching, that would allow reentrancy
282        // and out-of-order events. Instead, leave the events to be dispatched
283        // after the in-progress dispatch is complete.
284        if (isDispatching.get()) {
285          return;
286        }
287    
288        isDispatching.set(true);
289        try {
290          while (true) {
291            EventWithHandler eventWithHandler = eventsToDispatch.get().poll();
292            if (eventWithHandler == null) {
293              break;
294            }
295    
296            dispatch(eventWithHandler.event, eventWithHandler.handler);
297          }
298        } finally {
299          isDispatching.set(false);
300        }
301      }
302    
303      /**
304       * Dispatches {@code event} to the handler in {@code wrapper}.  This method
305       * is an appropriate override point for subclasses that wish to make
306       * event delivery asynchronous.
307       *
308       * @param event  event to dispatch.
309       * @param wrapper  wrapper that will call the handler.
310       */
311      protected void dispatch(Object event, EventHandler wrapper) {
312        try {
313          wrapper.handleEvent(event);
314        } catch (InvocationTargetException e) {
315          logger.log(Level.SEVERE,
316              "Could not dispatch event: " + event + " to handler " + wrapper, e);
317        }
318      }
319    
320      /**
321       * Retrieves a mutable set of the currently registered handlers for
322       * {@code type}.  If no handlers are currently registered for {@code type},
323       * this method may either return {@code null} or an empty set.
324       *
325       * @param type  type of handlers to retrieve.
326       * @return currently registered handlers, or {@code null}.
327       */
328      Set<EventHandler> getHandlersForEventType(Class<?> type) {
329        return handlersByType.get(type);
330      }
331    
332      /**
333       * Creates a new Set for insertion into the handler map.  This is provided
334       * as an override point for subclasses. The returned set should support
335       * concurrent access.
336       *
337       * @return a new, mutable set for handlers.
338       */
339      protected Set<EventHandler> newHandlerSet() {
340        return new CopyOnWriteArraySet<EventHandler>();
341      }
342    
343      /**
344       * Flattens a class's type hierarchy into a set of Class objects.  The set
345       * will include all superclasses (transitively), and all interfaces
346       * implemented by these superclasses.
347       *
348       * @param concreteClass  class whose type hierarchy will be retrieved.
349       * @return {@code clazz}'s complete type hierarchy, flattened and uniqued.
350       */
351      @VisibleForTesting
352      Set<Class<?>> flattenHierarchy(Class<?> concreteClass) {
353        try {
354          return flattenHierarchyCache.get(concreteClass);
355        } catch (ExecutionException e) {
356          throw Throwables.propagate(e.getCause());
357        }
358      }
359    
360      /** simple struct representing an event and it's handler */
361      static class EventWithHandler {
362        final Object event;
363        final EventHandler handler;
364        public EventWithHandler(Object event, EventHandler handler) {
365          this.event = event;
366          this.handler = handler;
367        }
368      }
369    }