EventBus源码学习随笔

EventBus是什么?

简介

EventBus是Android和Java的开源库,它使用发布者/订阅者模式进行松散耦合。EventBus使用了总线控制,能够用几行代码解耦类及简化代码,消除依赖关系,加速应用程序开发。

下图为官方示例图:

官方示例图

官网网址:EventBus官网

Github地址:Github

特点

  • 简化组件之间的通信
  • 发送者和接收者高度解耦
  • 性能高效,社区活跃
  • 库文件很小(<50KB)
  • 具有简便配置线程、优先级等高级特性

EventBus怎么用?

1、gradle引入

1
implementation 'org.greenrobot:eventbus:3.1.1'

2、注册与取消注册

1
2
3
4
5
//注册
EventBus.getDefault().register(this);

//取消注册
EventBus.getDefault().unregister(this);

3、事件定义与发送

1
2
3
4
5
//定义
public class MessageEvent {
}
//发送
EventBus.getDefault().post(new MessageEvent());

4、事件接收

1
2
3
4
@Subscribe(threadMode = ThreadMode.POSTING)
public void onEventMainThread(MessageEvent event) {
// doSomeThing();
};

EventBus核心执行流程是怎样?

EventBus的使用包含注册、取消注册、事件定义发送及事件接收。当用户进行注册时,会通过反射获取实例中定义事件方法集合,然后将事件方法集合及订阅者加入到Map中,当执行post时,会根据事件类型,从集合中获取对应的订阅集合,通过配置的threadMode,使用对应的Poster调用订阅者的事件,最后通过反射method的invoke执行事件。

关键类功能说明

说明
EventBus 总线调度类,getDefault()为单例模式。内部持有订阅者及事件集合。还有各事件的发送器。eventTypesCache(缓存所有粘性事件的集合)、subscriptionsByEventType(key为事件,value为订阅者集合的Map)、typesBySubscriber(key为订阅者,value为事件集合的Map)、currentPostingThreadState(ThreadLocal,当前线程的事件集合)、mainThreadPoster(主线程的post)、backgroundPoster(后台线程的post)、asyncPoster(异步线程的post)、subscriberMethodFinder(获取订阅者中的事件)
SubscriberMethod 订阅方法的类,包含Method、ThreadMode、priority等配置属性
Subscription 订阅者类,包含subscriber的Object实例、subscriberMethod
PostingThreadState 存储当前线程的事件集合
SubscriberMethodFinder 用于获取订阅中的定义的事件接收方法
PendingPost subscription与event的包装类,内部维护一个pendingPostPool,当池中有PendingPost实例,会进行复用
PendingPostQueue 内部维护了一个head、tail的PendingPost实例,提供enqueue及poll操作
HandlerPoster 用于处理主线程的事件执行
BackgroundPoster 用于处理后台线程的事件执行
AsyncPoster 用于执行异步线程的事件执行

代码执行流程

注册-register

注册主要是通过反射获取订阅者中定义的事件方法集合,将订阅者和事件集合加入对应的Map,然后会判断是否支持粘性事件,将之前发送的粘性事件缓存发送给订阅者。

1、 源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void register(Object subscriber) {
//获取注册的Object的Class
Class<?> subscriberClass = subscriber.getClass();
//通过subscriberMethodFinder获取该Object中所有的订阅方法(SubscriberMethod集合)
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//遍历事件集合
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//生成Subscription对象(SubscriberMethod的包装类)
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//获取subscriptionsByEventType集合(key为事情,value为订阅集合)
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//获取typesBySubscriber集合(key为订阅者,value为事件集合)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将所有的事件,根据订阅者key加入到事件集合中
subscribedEvents.add(eventType);

//事件方法是否支持(粘性事情)
if (subscriberMethod.sticky) {
//事件方法是否支持(继承事件)
if (eventInheritance) {

//遍历stickyEvents,找出所有继承于事件的父事件
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//给订阅者发送之前缓存的粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
//给订阅者发送之前缓存的粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

2、 流程图

image

发送-post

1、 源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
 public void post(Object event) {
//获取当前线程的PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列postingState.eventQueue
List<Object> eventQueue = postingState.eventQueue;
//添加事件
eventQueue.add(event);
//是否正在发送
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
//循环获取eventQueue中的事件
try {
while (!eventQueue.isEmpty()) {
//获取集合数据并移除,然后发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//事件是否支持继承
if (eventInheritance) {
//找出所有继承事件
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍历集合
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//发送
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//发送
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有订阅者会发送一个NoSubscriberEvent事件
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根据事件获取所有订阅者subscriptions
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历所有订阅者
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据对应的threadMode,使用对应的post进行事件的处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
//最后通过反射执行事件方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

2、 流程图
image

取消注册-unregister

1、 源码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public synchronized void unregister(Object subscriber) {
//根据subscriber从typesBySubscriber获取事件集合
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
//遍历订阅者的事件
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
//typesBySubscriber移除subscriber
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//根据eventType从subscriptionsByEventType获取订阅集合
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
//遍历集合移除当前的subscriber
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

2、 流程图

image

EventBus如何识别类中定义的接收方法?

EventBus之所以如此流行,一个很重要的地方就是使用非常简便。我们只需要在类中定义好方法接收对应的事件、配置好相关的标注就可以。那么怎么获取到订阅者的事件方法集合,就是EventBus设计的一个精髓的地方。通过上面的注册方法,我们知道主要是通过下面的方法来获取,下面我们主要分析一下具体的实现。

1
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

接着看findSubscriberMethods的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//定义了缓存Map,避免每次都执行反射获取,提高性能
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
//通过反射获取
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//通过索引获取
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

最后会执行indUsingReflection去获取,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//FindState 用来做订阅方法的校验和保存
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//通过反射来获得订阅方法信息
findUsingReflectionInSingleClass(findState);
//查找父类的订阅方法
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

关键的实现在findUsingReflectionInSingleClass方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// //通过反射得到方法数组
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历Method
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
////保证必须只有一个事件参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//得到注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//校验是否添加该方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//实例化SubscriberMethod对象并添加
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

总结一下,EventBus是通过反射getDeclaredMethods()获取类的方法集合,然后遍历方法集合,将符合事件定义的方法(public、只有一个事件参数、有Subcribe的注解等)加入到集合中。从而达到识别订阅者中定义的事件方法。

EventBus中的线程调度机制是怎么样的?

我们知道,EventBus可以通过定义threadMode来指定事件回调的执行线程。主要的配置如下:

  • ThreadMode: POSTING:默认的,在同一个线程中执行。

  • ThreadMode: MAIN :主线程执行

  • ThreadMode: MAIN_ORDERED: 主线程执行,不过需要排队,如果前一个也是main_ordered 需要等前一个执行完成后才执行,在主线程中执行,可以处理更新ui的操作。

  • ThreadMode: BACKGROUND :后台进程,处理如保存到数据库等操作。

  • ThreadMode: ASYNC :异步执行,另起线程操作。

通过上面的流程分析在post的过程中,最后都是通过postToSubscription执行,在这里面判断了threadMode的类型,具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据对应的threadMode,使用对应的post进行事件的处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

1、POSTING模式,直接执行invokeSubscriber。

2、MAIN模式,如果判断当前线程是主线程则执行invokeSubscriber,否则会使用mainThreadPoster执行enqueue方法。mainThreadPoster为HandlerPoster的实例,继承了Handler,是使用了MainLooper进行创建,也就是其的handleMessage在主线程中执行。
我们看具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void enqueue(Subscription subscription, Object event) {
//构建PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
//如果没有激活hander
if (!handlerActive) {
handlerActive = true;
//发送消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//循环获取队列中的所有pendingPost
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
//如果已没有数据,更新 handlerActive
handlerActive = false;
return;
}
}
}
//在主线程中实现事件的方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

HandlerPoster会通过主线程的Handler去执行队列中的所有事件方法。

3、MAIN_ORDERED模式 优先主线程队列执行

1
2
3
4
5
6
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}

4、BACKGROUND模式 如果再主线程则执行后台线程执行,否则使用当前线程

1
2
3
4
5
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}

这里我们主要看backgroundPoster的实现,BackgroundPoster继承了Runnable,实现线程池执行run方法,通过executorRunning控制不会每次都启动一个新任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
//变量标识,不要每次都执行
if (!executorRunning) {
executorRunning = true;
//线程池执行
eventBus.getExecutorService().execute(this);
}
}
}

@Override
public void run() {
try {
try {
while (true) {
//循环间隔1s获取事件
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
//如果已没有任务,更新变量
executorRunning = false;
return;
}
}
}
//在异步线程执行了事件方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}

backgroundPoster会开启一个线程去执行当前所有队列中的事件方法。

5、ASYNC 模式 主要使用了AsyncPoster,也继承了run接口。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}

AsyncPoster每一个事件都会开启一个异步任务,通过线程池去执行。

总结一下
EventBus通过配置threadMode,控制事件在不同的线程中去执行。总归有5种模式,分别为POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC。主要是通过HandlerPoster、backgroundPoster、AsyncPoster来实现线程的切换。

  • HandlerPoster会通过主线程的Handler开启循环去执行队列中的所有事件方法。
  • backgroundPoster会开启一个线程循环执行当前所有队列中的事件方法。
  • AsyncPoster每一个事件都会开启一个异步任务,通过线程池去执行。

EventBus的事件发送和接收的原理是什么?

EventBus主要是通过观察者模式来实现事件的发送和接收。使用register后,会将订阅者及定义的事件接收方法加入到Map中,当在任意地方执行post时,会对事件类型进行匹配,找出所有的订阅者,根据配置的threadMode,使用不同的poster通过反射去执行事件方法。当使用unregister后,会将订阅者在Map中移除,进行取消注册。

EventBus中代码运用了那些设计模式,有什么巧妙的设计?

1、单例模式

EventBus.getDefault()使用了懒汉的单例模式。

2、外观模式

EventBus对外提供了统一的调度,屏蔽了内部的实现。

3、建造者模式

Event对象的创建使用EventBusBuilder进行创建,将复杂对象的创建和表示分离,调用者不需要知道复杂的创建过程,使用Build的相关方法进行配置创建对象。

4、策略模式

根据threadMode的设置,使用不同Poster的实现策略来执行事件方法

总结

1、框架的设计不在复杂而在精巧

2、使用反射和标注可以简化很多实现

3、EventBus的使用要注意避免大量的滥用,将导致逻辑的分散,出现问题后很难定位