001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.eventbus; 018 019 import com.google.common.annotations.Beta; 020 021 import java.util.concurrent.ConcurrentLinkedQueue; 022 import java.util.concurrent.Executor; 023 024 /** 025 * An {@link EventBus} that takes the Executor of your choice and uses it to 026 * dispatch events, allowing dispatch to occur asynchronously. 027 * 028 * @author Cliff Biffle 029 * @since 10.0 030 */ 031 @Beta 032 public class AsyncEventBus extends EventBus { 033 private final Executor executor; 034 035 /** the queue of events is shared across all threads */ 036 private final ConcurrentLinkedQueue<EventWithHandler> eventsToDispatch = 037 new ConcurrentLinkedQueue<EventWithHandler>(); 038 039 /** 040 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 041 * events. Assigns {@code identifier} as the bus's name for logging purposes. 042 * 043 * @param identifier short name for the bus, for logging purposes. 044 * @param executor Executor to use to dispatch events. It is the caller's 045 * responsibility to shut down the executor after the last event has 046 * been posted to this event bus. 047 */ 048 public AsyncEventBus(String identifier, Executor executor) { 049 super(identifier); 050 this.executor = executor; 051 } 052 053 /** 054 * Creates a new AsyncEventBus that will use {@code executor} to dispatch 055 * events. 056 * 057 * @param executor Executor to use to dispatch events. It is the caller's 058 * responsibility to shut down the executor after the last event has 059 * been posted to this event bus. 060 */ 061 public AsyncEventBus(Executor executor) { 062 this.executor = executor; 063 } 064 065 @Override 066 protected void enqueueEvent(Object event, EventHandler handler) { 067 eventsToDispatch.offer(new EventWithHandler(event, handler)); 068 } 069 070 /** 071 * Dispatch {@code events} in the order they were posted, regardless of 072 * the posting thread. 073 */ 074 @Override 075 protected void dispatchQueuedEvents() { 076 while (true) { 077 EventWithHandler eventWithHandler = eventsToDispatch.poll(); 078 if (eventWithHandler == null) { 079 break; 080 } 081 082 dispatch(eventWithHandler.event, eventWithHandler.handler); 083 } 084 } 085 086 /** 087 * Calls the {@link #executor} to dispatch {@code event} to {@code handler}. 088 */ 089 @Override 090 protected void dispatch(final Object event, final EventHandler handler) { 091 executor.execute(new Runnable() { 092 @Override 093 @SuppressWarnings("synthetic-access") 094 public void run() { 095 AsyncEventBus.super.dispatch(event, handler); 096 } 097 }); 098 } 099 100 }