——深入浅出:基于波浪事件流和模块化的思路分析RxJava/zhuye.jpg)
前面Rx系列的五篇文章从源码的角度分析了RxJava几个主要功能的具体实现逻辑。在学习了RxJava源码之后的一段时间里,我阅读了Rx相关更多的文章,以及通过面试、技术分享等经历,一直在思考一个问题,就是如何总结RxJava的思想,其实就是如何讲清楚RxJava。尤其对于面试这种情况来说,很难在短时间内细抠源码,如何让面试官知道你学习过源码而不是只有使用的经验。以及在分享的时候如何能让听者快速理解。这篇文章就是我近期对这类问题的一个思考总结,基于波浪事件流和模块化的思路来分析RxJava。
版权声明:本文为博主原创文章,未经博主允许不得转载。
一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava
1.前言
再来说下为什么总结这篇文章。我在看各种源码并做技术分享的时候,一直有种体会,就是自己看源码或者是跟着别人讲解的博客看源码的时候,感觉理解还是比较容易的,但是一旦跟别人分享的时候,就感觉自己没讲清楚。所以我一直在反思,为什么会出现这种情况,如何做到所谓的“深入浅出”。总结起来有以下两点:
第一是表达能力。客观来说程序员这个行业对表达能力的要求不是很高,我认识的很多程序员朋友都是十分内向、不善于表达的。我之前的技术leader肖老板经常跟我们讲,程序员应该多去做技术分享,多去自信的表达。这个问题只能说多去练习吧,用不到像发布会似的那种口才,但是一定要自信,强烈的自信才能游刃有余。
第二也是最重要的一点,就是对源码思想的理解。代码这种东西表达的其实是抽象的逻辑,而听你讲解的人可能大多数都是没看过源码甚至没涉猎你所讲技术点的人,因此对于讲源码这种事来说,最重要的是把抽象的代码中的逻辑思想 讲清楚。如果一次技术分享就是把前五章的源码堆上去分析的话,我觉得大部分听者都会对你投来疑惑的点头和礼貌的微笑的。
思想的理解是没有标准的答案的,或者说,从不同的层面和维度能总结出不同的东西。这个问题只能是说多去看源码,看别人的文章,多去分析和总结。我对RxJava思想的理解可能也不如别人深刻,只能说不断学习,不断进步吧。
2.基于波浪事件流的思路
先来说下什么是“事件流”。其实在“一起读RxJava源码(五)——线程调度”中,我分析源码的方法就是按照从事件的产生到接收的流程为顺序进行的,在最后总结出的流程图(如下)也能看到RxJava代码执行时事件传递的流程。因此“事件流”其实就是事件传递的流程,“事件”也就是RxJava扩展的观察者模式中的事件(详见第三章)。概括来说,事件在被观察者的call()方法中发送,在观察者的onNext()方法中接收,通过subscribe()方法链接。
我总结了一种更直观的方式总结“事件流”,就是:在RxJava代码调用层看RxJava代码的执行顺序。这句话可能有点拗口,但其实并不难理解,就是表面意思。结合RxJava具体代码来看我觉得会更清楚。这里以第五章RxJava线程调度的代码为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("tag", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
textView.setText(s);
}
});
我们前五章的内容其实是深入每个方法,研究方法内部的调用逻辑。现在把所有调用的方法都当做“黑盒”,不深入进去。站在这个代码调用层从整体来看下:
Rx代码执行时先“向下”执行至.subscribe方法,每次创建一个新的被观察者;
而后从.subscribe开始执行“触发事件”流程,此时会“向上”执行至OnSubscribe的call()方法;
然后在call()方法中(subscriber.onNext)发送事件,再次“向下”执行事件下发流程,执行至.subscribe的onNext()接收事件,整个Rx调用结束。
从上可见,如果在RxJava代码调用层来看RxJava代码的执行顺序的话,Rx代码其实是经历了“向下”、“向上”、再次“向下”三个流程,从Observable.create开始最后执行至onNext结束。现在可以回到前几章,再看下转换操作符和线程调度的实现代码,尤其是上面的那个示意图。从那张图中也很明显的可以看出这三个流程。
这三个流程总结起来就是:
1、创建对象流程(被观察者):第一次“向下”的流程
2、事件触发流程:“向上”的流程
3、事件下发流程:第二次“向下”的流程
下面是三个流程的示意图,应该可以加深理解:
这三个流程连起来冲图上看就是个波浪的形状,因此我就简单叫做波浪形事件流。RxJava的各操作符都可以抽象成这三个流的逻辑,也是因为如此,才能把各个操作符无缝链接起来,形成传说中的链式调用。
3.基于模块化的思路
这里的“模块化”并不是架构中的那种模块化,这里指的是由于RxJava每个操作符是基于波浪事件流实现的,都可以抽象成这三个流的逻辑,因此可以向拼图一样,将各模块拼起来分析代码调用的逻辑。给张图就明白了:
——深入浅出:基于波浪事件流和模块化的思路分析RxJava/插件化.jpg)
从RxJava源码的角度来看,RxJava每个操作符都可以形成这三条流的本质其实在前面两章讲过了,就是每一个操作符都是返回一个新的被观察者Observable;并且在相应“上行”流中会创建新的观察者Subscriber,并与上一级被观察者Observable建立依赖关系。整体上看,就是每个操作符都是在目标观察者和被观察者之间加了一层。相当于从A和C的一层订阅变成了ABC的两层订阅。这样使得无论加入多少个操作符,都是按照波浪形事件流的顺序执行的。
4.利用两种思路分析复杂问题
前面分析了这么多,来看下用基于波浪事件流和模块化的思路分析RxJava的优点在哪。下面这个例子也不算很复杂,只是把之前讲过的操作符重新组合了一下,尝试判断下1~6号位置执行的线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(..1.);
subscriber.onCompleted();
}
})
.map(..2.)
.subscribeOn(Schedulers.newThread()...8)
.map(..3.)
.subscribeOn(Schedulers.newThread()...7)
.map(..4.)
.observeOn(Schedulers.newThread()...9)
.map(..5.)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
...
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
..6.
}
});
源码逻辑好的同学应该能很快看出来,1~4执行在8的线程,5执行在9的线程,而6执行在main线程。7号的线程其实是没有用的,为什么呢?在大部分博客中会有这样一段话:多次调用subscribeOn()只有第一次的有效, 其余的会被忽略;而每调用一次observeOn() , 线程就会切换一次。但是绝大部分博客并没有分析为什么。如果利用波浪事件流和模块化的思路来看,就十分好理解了,一张图就能说明白:
——深入浅出:基于波浪事件流和模块化的思路分析RxJava/复杂事例.jpg)
如上图所见,7的线程是存在的,只不过7的线程只是切换了8的线程,并没有做任何操作而已。为了验证上面的结论的正确性,我做了一点复杂的操作:在subscribeOn之后输出线程名,以及每个操作符执行目标代码时也输出线程名,验证我们上述结论。具体的代码这里写不下了,我传到了我的github上,有兴趣的同学可以看下:https://github.com/ten-z/MyRxJavaDemo。
我这里用了两种方式实现:
一种是继承Observable类,自己实现了一个在事件触发流程中调用方法的操作符myOperator,在调用方法中输出线程名。将此操作符放在subscribeOn之前调用,就可以打印出此subscribeOn切换的线程。结果如下:
第二种是利用AOP思想,使用AspectJ实现代码注入。在subscribeOn()执行切换线程后,调用SubscribeOnSubscriber.call之前切入,输出日志。结果图如下,可以看出跟我们用波浪事件流和模块化的思路推出的结果是一致的:
当然,上面只是举个例子。基于波浪事件流和模块化的思路的主要目的是帮助工程师快速理解RxJava。当然,在看源码时最根本还是要看懂源码,甚至总结出自己的思想才可能对RxJava有更深的理解。
5.总结
RxJava系列兜兜转转写了有六个专题了,主要是我自己对Rx的一些思考。当然细心的同学应该发现,我这几篇文章举例都是基于RxJava1.0的,但实际上RxJava1.0在18年3月已经停止更新了,2.0目前是Rx的主力,但是2.0和1.0主要的思想还是一样的。由于我本人看源码的时间比较久远了,那时候还是1.0的天下,因此这系列文章我还是以最熟悉的1.0的使用来举例的,主要还是为了总结一个思想。我相信无论Rx出到几,基本思想都不会有太大变化。以后有机会,我会再总结更新版本的Rx间的差异。大家加油!