001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.eventbus; 018 019import static com.google.common.base.Preconditions.checkNotNull; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.VisibleForTesting; 023import com.google.common.base.Throwables; 024import com.google.common.cache.CacheBuilder; 025import com.google.common.cache.CacheLoader; 026import com.google.common.cache.LoadingCache; 027import com.google.common.collect.HashMultimap; 028import com.google.common.collect.Multimap; 029import com.google.common.collect.SetMultimap; 030import com.google.common.reflect.TypeToken; 031import com.google.common.util.concurrent.UncheckedExecutionException; 032 033import java.lang.reflect.InvocationTargetException; 034import java.util.Collection; 035import java.util.LinkedList; 036import java.util.Map.Entry; 037import java.util.Queue; 038import java.util.Set; 039import java.util.concurrent.locks.ReadWriteLock; 040import java.util.concurrent.locks.ReentrantReadWriteLock; 041import java.util.logging.Level; 042import java.util.logging.Logger; 043 044/** 045 * Dispatches events to listeners, and provides ways for listeners to register 046 * themselves. 047 * 048 * <p>The EventBus allows publish-subscribe-style communication between 049 * components without requiring the components to explicitly register with one 050 * another (and thus be aware of each other). It is designed exclusively to 051 * replace traditional Java in-process event distribution using explicit 052 * registration. It is <em>not</em> a general-purpose publish-subscribe system, 053 * nor is it intended for interprocess communication. 054 * 055 * <h2>Receiving Events</h2> 056 * <p>To receive events, an object should: 057 * <ol> 058 * <li>Expose a public method, known as the <i>event subscriber</i>, which accepts 059 * a single argument of the type of event desired;</li> 060 * <li>Mark it with a {@link Subscribe} annotation;</li> 061 * <li>Pass itself to an EventBus instance's {@link #register(Object)} method. 062 * </li> 063 * </ol> 064 * 065 * <h2>Posting Events</h2> 066 * <p>To post an event, simply provide the event object to the 067 * {@link #post(Object)} method. The EventBus instance will determine the type 068 * of event and route it to all registered listeners. 069 * 070 * <p>Events are routed based on their type — an event will be delivered 071 * to any subscriber for any type to which the event is <em>assignable.</em> This 072 * includes implemented interfaces, all superclasses, and all interfaces 073 * implemented by superclasses. 074 * 075 * <p>When {@code post} is called, all registered subscribers for an event are run 076 * in sequence, so subscribers should be reasonably quick. If an event may trigger 077 * an extended process (such as a database load), spawn a thread or queue it for 078 * later. (For a convenient way to do this, use an {@link AsyncEventBus}.) 079 * 080 * <h2>Subscriber Methods</h2> 081 * <p>Event subscriber methods must accept only one argument: the event. 082 * 083 * <p>Subscribers should not, in general, throw. If they do, the EventBus will 084 * catch and log the exception. This is rarely the right solution for error 085 * handling and should not be relied upon; it is intended solely to help find 086 * problems during development. 087 * 088 * <p>The EventBus guarantees that it will not call a subscriber method from 089 * multiple threads simultaneously, unless the method explicitly allows it by 090 * bearing the {@link AllowConcurrentEvents} annotation. If this annotation is 091 * not present, subscriber methods need not worry about being reentrant, unless 092 * also called from outside the EventBus. 093 * 094 * <h2>Dead Events</h2> 095 * <p>If an event is posted, but no registered subscribers can accept it, it is 096 * considered "dead." To give the system a second chance to handle dead events, 097 * they are wrapped in an instance of {@link DeadEvent} and reposted. 098 * 099 * <p>If a subscriber for a supertype of all events (such as Object) is registered, 100 * no event will ever be considered dead, and no DeadEvents will be generated. 101 * Accordingly, while DeadEvent extends {@link Object}, a subscriber registered to 102 * receive any Object will never receive a DeadEvent. 103 * 104 * <p>This class is safe for concurrent use. 105 * 106 * <p>See the Guava User Guide article on <a href= 107 * "http://code.google.com/p/guava-libraries/wiki/EventBusExplained"> 108 * {@code EventBus}</a>. 109 * 110 * @author Cliff Biffle 111 * @since 10.0 112 */ 113@Beta 114public class EventBus { 115 116 /** 117 * A thread-safe cache for flattenHierarchy(). The Class class is immutable. This cache is shared 118 * across all EventBus instances, which greatly improves performance if multiple such instances 119 * are created and objects of the same class are posted on all of them. 120 */ 121 private static final LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache = 122 CacheBuilder.newBuilder() 123 .weakKeys() 124 .build(new CacheLoader<Class<?>, Set<Class<?>>>() { 125 @SuppressWarnings({"unchecked", "rawtypes"}) // safe cast 126 @Override 127 public Set<Class<?>> load(Class<?> concreteClass) { 128 return (Set) TypeToken.of(concreteClass).getTypes().rawTypes(); 129 } 130 }); 131 132 /** 133 * All registered event subscribers, indexed by event type. 134 * 135 * <p>This SetMultimap is NOT safe for concurrent use; all access should be 136 * made after acquiring a read or write lock via {@link #subscribersByTypeLock}. 137 */ 138 private final SetMultimap<Class<?>, EventSubscriber> subscribersByType = 139 HashMultimap.create(); 140 private final ReadWriteLock subscribersByTypeLock = new ReentrantReadWriteLock(); 141 142 /** 143 * Strategy for finding subscriber methods in registered objects. Currently, 144 * only the {@link AnnotatedSubscriberFinder} is supported, but this is 145 * encapsulated for future expansion. 146 */ 147 private final SubscriberFindingStrategy finder = new AnnotatedSubscriberFinder(); 148 149 /** queues of events for the current thread to dispatch */ 150 private final ThreadLocal<Queue<EventWithSubscriber>> eventsToDispatch = 151 new ThreadLocal<Queue<EventWithSubscriber>>() { 152 @Override protected Queue<EventWithSubscriber> initialValue() { 153 return new LinkedList<EventWithSubscriber>(); 154 } 155 }; 156 157 /** true if the current thread is currently dispatching an event */ 158 private final ThreadLocal<Boolean> isDispatching = 159 new ThreadLocal<Boolean>() { 160 @Override protected Boolean initialValue() { 161 return false; 162 } 163 }; 164 165 private SubscriberExceptionHandler subscriberExceptionHandler; 166 167 /** 168 * Creates a new EventBus named "default". 169 */ 170 public EventBus() { 171 this("default"); 172 } 173 174 /** 175 * Creates a new EventBus with the given {@code identifier}. 176 * 177 * @param identifier a brief name for this bus, for logging purposes. Should 178 * be a valid Java identifier. 179 */ 180 public EventBus(String identifier) { 181 this(new LoggingSubscriberExceptionHandler(identifier)); 182 } 183 184 /** 185 * Creates a new EventBus with the given {@link SubscriberExceptionHandler}. 186 * 187 * @param subscriberExceptionHandler Handler for subscriber exceptions. 188 * @since 16.0 189 */ 190 public EventBus(SubscriberExceptionHandler subscriberExceptionHandler) { 191 this.subscriberExceptionHandler = checkNotNull(subscriberExceptionHandler); 192 } 193 194 /** 195 * Registers all subscriber methods on {@code object} to receive events. 196 * Subscriber methods are selected and classified using this EventBus's 197 * {@link SubscriberFindingStrategy}; the default strategy is the 198 * {@link AnnotatedSubscriberFinder}. 199 * 200 * @param object object whose subscriber methods should be registered. 201 */ 202 public void register(Object object) { 203 Multimap<Class<?>, EventSubscriber> methodsInListener = 204 finder.findAllSubscribers(object); 205 subscribersByTypeLock.writeLock().lock(); 206 try { 207 subscribersByType.putAll(methodsInListener); 208 } finally { 209 subscribersByTypeLock.writeLock().unlock(); 210 } 211 } 212 213 /** 214 * Unregisters all subscriber methods on a registered {@code object}. 215 * 216 * @param object object whose subscriber methods should be unregistered. 217 * @throws IllegalArgumentException if the object was not previously registered. 218 */ 219 public void unregister(Object object) { 220 Multimap<Class<?>, EventSubscriber> methodsInListener = finder.findAllSubscribers(object); 221 for (Entry<Class<?>, Collection<EventSubscriber>> entry : 222 methodsInListener.asMap().entrySet()) { 223 Class<?> eventType = entry.getKey(); 224 Collection<EventSubscriber> eventMethodsInListener = entry.getValue(); 225 226 subscribersByTypeLock.writeLock().lock(); 227 try { 228 Set<EventSubscriber> currentSubscribers = subscribersByType.get(eventType); 229 if (!currentSubscribers.containsAll(eventMethodsInListener)) { 230 throw new IllegalArgumentException( 231 "missing event subscriber for an annotated method. Is " + object + " registered?"); 232 } 233 currentSubscribers.removeAll(eventMethodsInListener); 234 } finally { 235 subscribersByTypeLock.writeLock().unlock(); 236 } 237 } 238 } 239 240 /** 241 * Posts an event to all registered subscribers. This method will return 242 * successfully after the event has been posted to all subscribers, and 243 * regardless of any exceptions thrown by subscribers. 244 * 245 * <p>If no subscribers have been subscribed for {@code event}'s class, and 246 * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a 247 * DeadEvent and reposted. 248 * 249 * @param event event to post. 250 */ 251 public void post(Object event) { 252 Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass()); 253 254 boolean dispatched = false; 255 for (Class<?> eventType : dispatchTypes) { 256 subscribersByTypeLock.readLock().lock(); 257 try { 258 Set<EventSubscriber> wrappers = subscribersByType.get(eventType); 259 260 if (!wrappers.isEmpty()) { 261 dispatched = true; 262 for (EventSubscriber wrapper : wrappers) { 263 enqueueEvent(event, wrapper); 264 } 265 } 266 } finally { 267 subscribersByTypeLock.readLock().unlock(); 268 } 269 } 270 271 if (!dispatched && !(event instanceof DeadEvent)) { 272 post(new DeadEvent(this, event)); 273 } 274 275 dispatchQueuedEvents(); 276 } 277 278 /** 279 * Queue the {@code event} for dispatch during 280 * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence 281 * so they can be dispatched in the same order. 282 */ 283 void enqueueEvent(Object event, EventSubscriber subscriber) { 284 eventsToDispatch.get().offer(new EventWithSubscriber(event, subscriber)); 285 } 286 287 /** 288 * Drain the queue of events to be dispatched. As the queue is being drained, 289 * new events may be posted to the end of the queue. 290 */ 291 void dispatchQueuedEvents() { 292 // don't dispatch if we're already dispatching, that would allow reentrancy 293 // and out-of-order events. Instead, leave the events to be dispatched 294 // after the in-progress dispatch is complete. 295 if (isDispatching.get()) { 296 return; 297 } 298 299 isDispatching.set(true); 300 try { 301 Queue<EventWithSubscriber> events = eventsToDispatch.get(); 302 EventWithSubscriber eventWithSubscriber; 303 while ((eventWithSubscriber = events.poll()) != null) { 304 dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber); 305 } 306 } finally { 307 isDispatching.remove(); 308 eventsToDispatch.remove(); 309 } 310 } 311 312 /** 313 * Dispatches {@code event} to the subscriber in {@code wrapper}. This method 314 * is an appropriate override point for subclasses that wish to make 315 * event delivery asynchronous. 316 * 317 * @param event event to dispatch. 318 * @param wrapper wrapper that will call the subscriber. 319 */ 320 void dispatch(Object event, EventSubscriber wrapper) { 321 try { 322 wrapper.handleEvent(event); 323 } catch (InvocationTargetException e) { 324 try { 325 subscriberExceptionHandler.handleException( 326 e.getCause(), 327 new SubscriberExceptionContext( 328 this, 329 event, 330 wrapper.getSubscriber(), 331 wrapper.getMethod())); 332 } catch (Throwable t) { 333 // If the exception handler throws, log it. There isn't much else to do! 334 Logger.getLogger(EventBus.class.getName()).log(Level.SEVERE, 335 String.format( 336 "Exception %s thrown while handling exception: %s", t, 337 e.getCause()), 338 t); 339 } 340 } 341 } 342 343 /** 344 * Flattens a class's type hierarchy into a set of Class objects. The set 345 * will include all superclasses (transitively), and all interfaces 346 * implemented by these superclasses. 347 * 348 * @param concreteClass class whose type hierarchy will be retrieved. 349 * @return {@code clazz}'s complete type hierarchy, flattened and uniqued. 350 */ 351 @VisibleForTesting 352 Set<Class<?>> flattenHierarchy(Class<?> concreteClass) { 353 try { 354 return flattenHierarchyCache.getUnchecked(concreteClass); 355 } catch (UncheckedExecutionException e) { 356 throw Throwables.propagate(e.getCause()); 357 } 358 } 359 360 /** 361 * Simple logging handler for subscriber exceptions. 362 */ 363 private static final class LoggingSubscriberExceptionHandler 364 implements SubscriberExceptionHandler { 365 366 /** 367 * Logger for event dispatch failures. Named by the fully-qualified name of 368 * this class, followed by the identifier provided at construction. 369 */ 370 private final Logger logger; 371 372 /** 373 * @param identifier a brief name for this bus, for logging purposes. Should 374 * be a valid Java identifier. 375 */ 376 public LoggingSubscriberExceptionHandler(String identifier) { 377 logger = Logger.getLogger( 378 EventBus.class.getName() + "." + checkNotNull(identifier)); 379 } 380 381 @Override 382 public void handleException(Throwable exception, 383 SubscriberExceptionContext context) { 384 logger.log(Level.SEVERE, "Could not dispatch event: " 385 + context.getSubscriber() + " to " + context.getSubscriberMethod(), 386 exception.getCause()); 387 } 388 } 389 390 /** simple struct representing an event and it's subscriber */ 391 static class EventWithSubscriber { 392 final Object event; 393 final EventSubscriber subscriber; 394 public EventWithSubscriber(Object event, EventSubscriber subscriber) { 395 this.event = checkNotNull(event); 396 this.subscriber = checkNotNull(subscriber); 397 } 398 } 399}