一、EventBus使用场景示例
Guava EventBus是事件发布/订阅框架,采用观察者模式,通过解耦发布者和订阅者简化事件(消息)的传递。这有点像简化版的MQ,除去了Broker,由EventBus托管了订阅&发布。EventBus能使用在异步场景中,例如数据库状态的更新、发送邮件、更新日志等。在我们的系统中,主要用于任务执行完毕之后数据库中对应数据状态的变更。示例代码如下:
public class TaskEventBus { /** * 使用同步方式,触发事件执行的线程即是调用线程,例如main()中调用post方法,事件的执行线程即Main */ //private static EventBus eventBus = new EventBus(); /** * 使用异步方式,事件执行交由指定线程池完成,例如executor即是事件的具体执行者,会有4个线程负责执行事件 */ private static ExecutorService executor = Executors.newFixedThreadPool(4); private static EventBus eventBus = new AsyncEventBus("taskEventBus",executor ); /** * 注册监听器(订阅事件) * @param listener */ public static void registerListener(Listener listener) { eventBus.register(listener); } /** * 触发事件,获取对应监听器执行某些操作 * @param event */ public static void notifyListener(Event event) { eventBus.post(event); } public static void close() { executor.shutdown(); }}public class SubStoreTaskSuccessEvent extends SubStoreTaskEvent{ public SubStoreTaskSuccessEvent(SubStoreTask task) { super(task); }}public class SubStoreTaskFailEvent extends SubStoreTaskEvent{ public SubStoreTaskFailEvent(SubStoreTask task) { super(task); }}public class SubStoreTaskListener implements Listener{ @Subscribe public void onSuccess(SubStoreTaskSuccessEvent obj) { obj.getEvent().doSuccess(); } @Subscribe public void onFail(SubStoreTaskFailEvent obj) { obj.getEvent().doFail(); }}@AllArgsConstructorpublic class SubStoreTask { private String taskName; public void doFail() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Fail "+System.currentTimeMillis()); } public void doSuccess() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Success "+System.currentTimeMillis()); }}public class TaskInvoker { public static void main(String[] args) { TaskEventBus.registerListener(new SubStoreTaskListener()); SubStoreTask task = new SubStoreTask("A"); TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task)); TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task)); TaskEventBus.notifyListener(new SubStoreTaskFailEvent(task)); TaskEventBus.close(); }}
二、EventBus源码分析
2.1 整体结构&概念
EventBus是同步模式,异步模式使用AsyncEventBus。
EventBus主要有以下几个概念:
1.SubscriberRegistry:管理所有的事件订阅者
2.SubscriberExceptionHandler:处理事件时发生异常的处理器,可以由开发者自己实现
3.Executor:事件的执行线程,EventBus采用默认的线程池,AsyncEventBus需要开发者自己指定线程池
4.Dispatcher:订阅&事件 的派发器,处理某一个事件和对应的订阅者,指导订阅者该如何消费事件。具体实现有
ImmediateDispatcher:从名字来看这个事件分发器是立即发布当前的这个事件,实际情况也是如此,当用户调用dispatch(...)
方法时,它不用任何队列缓存事件,而是立即向该事件的订阅者发布事件。
LegacyAsyncDispatcher:从名字来看这个事件分发器应该是要废弃掉的(官方的说法是:but is otherwise not especially useful.),与PerThreadQueuedDispatcher相比,它是采用了一个全局队列来存储所有{到达的事件:订阅者}对,它不确保投放的顺序和事件到达的顺序是一致的。这也是作者诟病既然不能保证顺序,用队列就是多此一举。
PerThreadQueuedDispatcher::从名字可以看出来这个事件分发器是以线程为单位进行的,它为每个线程申请了一个队列,这个队列存放了所有投递到该线程的事件及其订阅者们,并且按照事件到达的顺序依次进行投放。
2.2 如何注册
使用 eventBus.register(listener); 一行代码就实现订阅者的注册。源码位于SubscriberRegistry.java。通过register实现订阅者的注册。这里使用了MultiMap & CopyOnWriteArraySet 数据结构,前者维护<key,List<value>>,后者在每次添加新的订阅者时都会复制一份,具体用法原理百度之。
void register(Object listener) { //根据订阅者找到对应的Multimap , Subscriber> listenerMethods = findAllSubscribers(listener); //新的订阅者添加到集合中 for (Map.Entry , Collection > entry : listenerMethods.asMap().entrySet()) { Class eventType = entry.getKey(); Collection eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet newSet = new CopyOnWriteArraySet (); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
这里有个坑:@Subscribe
所annotate的method
的参数,不能支持泛型。因为在运行的时候,因为Type Erasure
导致拿不到"真正"的parameterType,所以代码写成这样是会出问题的
public class SubStoreTaskListener implements Listener{ //解析时SubStoreTaskEvent会泛型参数,实际是Object类型,导致注册上的eventType是Object.class,不是SubStoreTaskEvent.class @Override @Subscribe public void onSuccess(SubStoreTaskEvent obj) { obj.getEvent().doSuccess(); } @Override @Subscribe public void onFail(SubStoreTaskEvent obj) { obj.getEvent().doFail(); }}
//根据订阅者找到其实现了@SubScribe的方法,建立 <事件类型,订阅者> 的映射关系private Multimap, Subscriber> findAllSubscribers(Object listener) { Multimap 事件类型,订阅者>, Subscriber> methodsInListener = HashMultimap.create(); Class clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class [] parameterTypes = method.getParameterTypes(); Class eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; } private static ImmutableList getAnnotatedMethods(Class clazz) { return subscriberMethodsCache.getUnchecked(clazz); } //这个执行结果会放入到subscriberMethodsCache本地缓存中,下次同样的Class不用再反射解析,直接从缓存中拿 private static ImmutableList getAnnotatedMethodsNotCached(Class clazz) { Set > supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map identifiers = Maps.newHashMap(); for (Class supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { //遍历订阅者的每个方法,检查方法上是否有@SubScribe注解 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { Class [] parameterTypes = method.getParameterTypes(); //注解方法只能有一个参数 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
如何消费
使用eventBus.post(event);将事件发布,通知订阅者执行处理逻辑。
public void post(Object event) { //SubscriberRegistry根据事件取出对应的订阅者 IteratoreventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { //交给对应的Dispatcher处理 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
因为Dispatcher有三种实现,在AsyncEventBus中使用LegacyAsyncDispatcher,在EventBus中使用PerThreadQueuedDispatcher(关键使用ThreadLocal<Queue> ThreadLocal<Boolean>)
在PerThreadQueuedDispatcher分发模型下,它会为每个线程申请一个事件队列,该线程所发布的事件都会存入此队列下,因此保证了线程内的事件发布顺序一致性,而且在分发过程中,按照广度优先原则:事件A的所有订阅者都处理完毕时,再发布下一个事件来进行发布。如下图所示:
[PerThreadQueuedDispatcher] /** * Per-thread queue of events to dispatch. */ private final ThreadLocal> queue = new ThreadLocal >() { @Override protected Queue initialValue() { return Queues.newArrayDeque(); } }; /** * Per-thread dispatch state, used to avoid reentrant event dispatching. * 为了防止线程重复发布事件,即多次递归调用了post(...)方法 *在第一次进入的时候,将dispatching变量设置true,下次就不会再重入进来。 *该模型保证了同一线程下事件是按序发布的,而且当一个事件的订阅者都接收到消息时,才会发布下一个事件。 */ private final ThreadLocal dispatching = new ThreadLocal () { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); //防止多次重入分发逻辑,线程第一次进入时将此变量设置为true,下次无法再次进入。 if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } private static final class Event { private final Object event; private final Iterator subscribers;
LegacyAsyncDispatcher分发模型下,因为采用全局的ConcurrentLinkedQueue,多个线程会同时写入该队列,事件的发布顺序无法保障和事件的到达顺序一致,而且也无法保证事件是按照广度优先的策略发布的,即A事件的所有订阅者都收到消息时才发布下一个事件。
[LegacyAsyncDispatcher] private final ConcurrentLinkedQueuequeue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } } private static final class EventWithSubscriber { private final Object event; private final Subscriber subscriber; private EventWithSubscriber(Object event, Subscriber subscriber) { this.event = event; this.subscriber = subscriber; } } }