博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Guava EventBus源码解析
阅读量:6245 次
发布时间:2019-06-22

本文共 9501 字,大约阅读时间需要 31 分钟。

hot3.png

一、EventBus使用场景示例

Guava EventBus是事件发布/订阅框架,采用观察者模式,通过解耦发布者和订阅者简化事件(消息)的传递。这有点像简化版的MQ,除去了Broker,由EventBus托管了订阅&发布。EventBus能使用在异步场景中,例如数据库状态的更新、发送邮件、更新日志等。在我们的系统中,主要用于任务执行完毕之后数据库中对应数据状态的变更。示例代码如下:

public class TaskEventBus {    /**     * 使用同步方式,触发事件执行的线程即是调用线程,例如main()中调用post方法,事件的执行线程即Main     */    //private static EventBus eventBus = new EventBus();    /**     * 使用异步方式,事件执行交由指定线程池完成,例如executor即是事件的具体执行者,会有4个线程负责执行事件     */    private static ExecutorService executor = Executors.newFixedThreadPool(4);    private static EventBus eventBus = new AsyncEventBus("taskEventBus",executor );    /**     * 注册监听器(订阅事件)     * @param listener     */    public static void registerListener(Listener listener) {        eventBus.register(listener);    }    /**     * 触发事件,获取对应监听器执行某些操作     * @param event     */    public static void notifyListener(Event event) {        eventBus.post(event);    }    public static void close() {        executor.shutdown();    }}public class SubStoreTaskSuccessEvent extends SubStoreTaskEvent{    public SubStoreTaskSuccessEvent(SubStoreTask task) {        super(task);    }}public class SubStoreTaskFailEvent extends SubStoreTaskEvent{    public SubStoreTaskFailEvent(SubStoreTask task) {        super(task);    }}public class SubStoreTaskListener implements Listener{    @Subscribe    public void onSuccess(SubStoreTaskSuccessEvent obj) {        obj.getEvent().doSuccess();    }    @Subscribe    public void onFail(SubStoreTaskFailEvent obj) {        obj.getEvent().doFail();    }}@AllArgsConstructorpublic class SubStoreTask {    private String taskName;    public void doFail() {        try {            TimeUnit.MILLISECONDS.sleep(500);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Fail "+System.currentTimeMillis());    }    public void doSuccess() {        try {            TimeUnit.MILLISECONDS.sleep(500);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Success "+System.currentTimeMillis());    }}public class TaskInvoker {    public static void main(String[] args) {        TaskEventBus.registerListener(new SubStoreTaskListener());        SubStoreTask task = new SubStoreTask("A");        TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task));        TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task));        TaskEventBus.notifyListener(new SubStoreTaskFailEvent(task));        TaskEventBus.close();    }}

 

二、EventBus源码分析

2.1 整体结构&概念

EventBus是同步模式,异步模式使用AsyncEventBus。

EventBus主要有以下几个概念:

1.SubscriberRegistry:管理所有的事件订阅者

2.SubscriberExceptionHandler:处理事件时发生异常的处理器,可以由开发者自己实现

3.Executor:事件的执行线程,EventBus采用默认的线程池,AsyncEventBus需要开发者自己指定线程池

4.Dispatcher:订阅&事件 的派发器,处理某一个事件和对应的订阅者,指导订阅者该如何消费事件。具体实现有

ImmediateDispatcher:从名字来看这个事件分发器是立即发布当前的这个事件,实际情况也是如此,当用户调用dispatch(...)方法时,它不用任何队列缓存事件,而是立即向该事件的订阅者发布事件

LegacyAsyncDispatcher:从名字来看这个事件分发器应该是要废弃掉的(官方的说法是:but is otherwise not especially useful.),与PerThreadQueuedDispatcher相比,它是采用了一个全局队列来存储所有{到达的事件:订阅者}对,它不确保投放的顺序和事件到达的顺序是一致的。这也是作者诟病既然不能保证顺序,用队列就是多此一举。

PerThreadQueuedDispatcher::从名字可以看出来这个事件分发器是以线程为单位进行的,它为每个线程申请了一个队列,这个队列存放了所有投递到该线程的事件及其订阅者们,并且按照事件到达的顺序依次进行投放。

2.2 如何注册

使用 eventBus.register(listener); 一行代码就实现订阅者的注册。源码位于SubscriberRegistry.java。通过register实现订阅者的注册。这里使用了MultiMap & CopyOnWriteArraySet 数据结构,前者维护<key,List<value>>,后者在每次添加新的订阅者时都会复制一份,具体用法原理百度之。

void register(Object listener) {   //根据订阅者找到对应的
Multimap
, Subscriber> listenerMethods = findAllSubscribers(listener); //新的订阅者添加到集合中 for (Map.Entry
, Collection
> entry : listenerMethods.asMap().entrySet()) { Class
eventType = entry.getKey(); Collection
eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet
eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet
newSet = new CopyOnWriteArraySet
(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }

 这里有个坑:@Subscribe所annotate的method的参数,不能支持泛型。因为在运行的时候,因为Type Erasure导致拿不到"真正"的parameterType,所以代码写成这样是会出问题的

public class SubStoreTaskListener implements Listener
{ //解析时SubStoreTaskEvent会泛型参数,实际是Object类型,导致注册上的eventType是Object.class,不是SubStoreTaskEvent.class @Override @Subscribe public void onSuccess(SubStoreTaskEvent obj) { obj.getEvent().doSuccess(); } @Override @Subscribe public void onFail(SubStoreTaskEvent obj) { obj.getEvent().doFail(); }}

 

//根据订阅者找到其实现了@SubScribe的方法,建立
<事件类型,订阅者>
的映射关系private Multimap
, Subscriber> findAllSubscribers(Object listener) { Multimap
, Subscriber> methodsInListener = HashMultimap.create(); Class
clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class
[] parameterTypes = method.getParameterTypes(); Class
eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; } private static ImmutableList
getAnnotatedMethods(Class
clazz) { return subscriberMethodsCache.getUnchecked(clazz); } //这个执行结果会放入到subscriberMethodsCache本地缓存中,下次同样的Class不用再反射解析,直接从缓存中拿 private static ImmutableList
getAnnotatedMethodsNotCached(Class
clazz) { Set
> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map
identifiers = Maps.newHashMap(); for (Class
supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { //遍历订阅者的每个方法,检查方法上是否有@SubScribe注解 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { Class
[] parameterTypes = method.getParameterTypes(); //注解方法只能有一个参数 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }

如何消费

使用eventBus.post(event);将事件发布,通知订阅者执行处理逻辑。

public void post(Object event) {    //SubscriberRegistry根据事件取出对应的订阅者    Iterator
eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { //交给对应的Dispatcher处理 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }

因为Dispatcher有三种实现,在AsyncEventBus中使用LegacyAsyncDispatcher,在EventBus中使用PerThreadQueuedDispatcher(关键使用ThreadLocal<Queue> ThreadLocal<Boolean>)

在PerThreadQueuedDispatcher分发模型下,它会为每个线程申请一个事件队列,该线程所发布的事件都会存入此队列下,因此保证了线程内的事件发布顺序一致性,而且在分发过程中,按照广度优先原则:事件A的所有订阅者都处理完毕时,再发布下一个事件来进行发布。如下图所示:

[PerThreadQueuedDispatcher] /**     * Per-thread queue of events to dispatch.     */    private final ThreadLocal
> queue = new ThreadLocal
>() { @Override protected Queue
initialValue() { return Queues.newArrayDeque(); } }; /** * Per-thread dispatch state, used to avoid reentrant event dispatching. * 为了防止线程重复发布事件,即多次递归调用了post(...)方法 *在第一次进入的时候,将dispatching变量设置true,下次就不会再重入进来。 *该模型保证了同一线程下事件是按序发布的,而且当一个事件的订阅者都接收到消息时,才会发布下一个事件。 */ private final ThreadLocal
dispatching = new ThreadLocal
() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator
subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue
queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); //防止多次重入分发逻辑,线程第一次进入时将此变量设置为true,下次无法再次进入。 if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } private static final class Event { private final Object event; private final Iterator
subscribers;

LegacyAsyncDispatcher分发模型下,因为采用全局的ConcurrentLinkedQueue,多个线程会同时写入该队列,事件的发布顺序无法保障和事件的到达顺序一致,而且也无法保证事件是按照广度优先的策略发布的,即A事件的所有订阅者都收到消息时才发布下一个事件。 

[LegacyAsyncDispatcher] private final ConcurrentLinkedQueue
queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator
subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } } private static final class EventWithSubscriber { private final Object event; private final Subscriber subscriber; private EventWithSubscriber(Object event, Subscriber subscriber) { this.event = event; this.subscriber = subscriber; } } }

 

 

 

转载于:https://my.oschina.net/u/2302503/blog/1834785

你可能感兴趣的文章
用图帮你了解https的原理
查看>>
区块链如何改变AI
查看>>
HTML5/JavaScript UI控件Wijmo Enterprise 2018v2发布
查看>>
工业仪表盘控件Iocomp ActiveX常见问题(2):Visual Basic中的错误
查看>>
Docker下使用selenium+testng实现web自动化
查看>>
当执行npm时遇到的问题
查看>>
JAVA程序员面试30问(附带答案)
查看>>
Java性能调优攻略全分享,七步搞定!(附学习资料分享)
查看>>
企业级 SpringBoot 教程 (六)springboot整合mybatis
查看>>
程序员写了一段注释, 第二天惨被公司开除, 公司巧妙回怼
查看>>
8.eclipse 安装 lombook插件
查看>>
Maven项目中使用本地JAR包方案4
查看>>
如何利用XMind创建概念图
查看>>
ldap接触(3)之LDAP特定错误以及错误一览表
查看>>
Zookeeper的功能以及工作原理
查看>>
朝花夕拾之Oracle11g 表分区
查看>>
本分类说明 -- django
查看>>
Android Binder IPC分析
查看>>
mysql分隔字符串,并将分隔字符串作为新列
查看>>
图学java基础篇之集合
查看>>