——转换操作符/caozuofu.jpg)
上一篇我们介绍了RxJava最基本的实现方式以及源码逻辑,但是仅仅会基础实现方式是远远不够的。RxJava有着大量的各式各样的操作符(如上图谱所示)。也正是因为这些操作符,使得RxJava可以很简单且简洁的实现一些复杂的问题。这也是RxJava以至于Rx系列强大的原因之一。
版权声明:本文为博主原创文章,未经博主允许不得转载。
一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava
1. 前言
上一篇我们介绍的RxJava最基本的实现方式虽然没有展现出RxJava的强大,但是却是一切的基础。没有看过的工友建议先看下,以免这一章节消化不了。
——转换操作符/nan.jpg)
RxJava的操作符有很多,上面的图谱里就能看出来,具体使用可以参考官网介绍。这里主要介绍的是转换操作符,它作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。直接看概念可能会不太好理解,这篇文章以相对简单且常用的转换操作符map为例,进行源码的分析。了解map操作符的逻辑之后可能再来看概念就容易理解得多了。
转换操作符实现的其实是“变换”的原理。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
2. map操作符的简单应用
先举个例子,将一组字符串中的小写字符转换成大写输出,用map可以这样实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28List<String> froms = new ArrayList<>();
froms.add("hello");
froms.add("rxjava");
froms.add("毁我青春");
Observable.from(froms)
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
//System.out.println("onCompleted");
Log.i("tag", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
}
});
输出的结果是这样的:1
2
3
4onNext=HELLO
onNext=RXJAVA
onNext=毁我青春
onCompleted
这里Observable.from()这个方法是将传入的数组或Iterable拆分成具体对象后,依次发送出来。和之前的之前的create(OnSubscribe)是等价的。
可以看到,map()方法将每次接收到的String对象进行了toUpperCase()操作,经过map()方法后,Subscriber里onNext()收到的就是转换成大写的内容。当然,map()方法也可以进行其他类型的转换,比如将int转换成字符串,或是字符串转换成Bitmap对象等,在Func1类构造函数的泛型参数(new Func1<String, String>())
就是转换前和转换后的对象类型。
总结一下map()的用法:它是对事件对象一对一的直接变换,也是RxJava最常用的变换。如下图所示:
——转换操作符/zhuanhuan.png)
3. map操作符的源码解析
为了基于上一章节所讲的内容,我们把上面的例子变得简单一点:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava");
subscriber.onCompleted();
}
}).map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("tag", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
}
});
由于上一章节讲解了Observable.create()和.subscribe()的源码,所以这里用了上一章节介绍的RxJava基本实现。from()的源码解析这里就不多赘述了,主要还是理解map的基本的逻辑。
——转换操作符/biehuang.jpg)
这里Observable.create()自然不必多说,最终返回了一个新创建的被观察者Observable。下一步会执行Observable.map()。
map()的源码也在Observable类里:1
2
3public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
再看下unsafeCreate()的类:1
2
3public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
是不是有点眼熟??因为跟Observable.create()的实现是一毛一样的啊!可以现在回到上一章节对比一下。也就是说,map()方法也是直接返回了一个新创建的被观察者Observable,这是一个新的被观察者,我们把它称为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。
我们可以看到在ObservableB中,传入RxJavaHooks.onCreate(f)的参数可以追溯到map()方法里传入的new OnSubscribeMap
现在我们现在把视野拉远,从整体再看下这个例子的逻辑:
也就是说,在执行了map方法后,变成了ObservableB和Subscriber建立订阅关系。我们把最后的观察者也起个名子区分,叫SubscriberC。总结下,就是原本ObservableA.subscribe(SubscriberC)之间加了.map()后,现在变成了ObservableB.subscribe(SubscriberC)。
根据上一篇文章介绍的RxJava基础流程,在ObservableB.subscribe(SubscriberC)建立订阅关系后,应该执行ObservableB的onSubscribe里的call方法。上面讲到了 ObservableB对应的OnSubscribe是OnSubscribeMap,我们来看一下OnSubscribeMap的源码:
1 | public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { |
OnSubscribeMap实现了OnSubscribe接口,因此OnSubscribeMap就是一个OnSubscribe。我们来看下OnSubscribeMap的call方法。首先通过我们的观察者o(即SubscriberC)和转换函数transformer构造了一个MapSubscriber。然后调用了source的unsafeSubscribe()。
我们看下这个source是从OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer)
这个构造器里传过来的。往上一级看,这个构造器是在ObservableA.map()方法里调用的(具体源码请往上翻到map()源码),这里source传入的就是ObservableA。
所以这里小结下,OnSubscribeMap的call方法里本质是调用了ObservableA的unsafeSubscribe()。我们继续深入,看下unsafeSubscribe()源码:
1 | public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { |
有没有很眼熟??可以返回上一篇文章里.subscribe()源码解析看一下,是的,你应该没有记错,这段代码跟.subscribe()的源码基本完全一样。核心的那句又出来了:1
RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);
也就是说,调用了ObservableA的onSubscribe里的call方法,也就是执行了ObservableA.subscribe(MapSubscriber)。而MapSubscriber其实是在OnSubscribeMap.call()中new出来的。可以理解成是属于B的。执行到这里B不再只是SubscriberC的被观察者了,他同时也是ObservableA的观察者,记为 SubscriberB。B = (ObservableB+SubscriberB)。
所以到这里,我们可以这样理解:
第一步:ObservableB.subscribe(SubscriberC)
第二步:调用ObservableB的call方法
第三步:在ObservableB的call方法里创建了SubscriberB ,并调用了ObservableA.subscribe(SubscriberB)
下一步跟之前以及RxJava基本调用逻辑一样,执行ObservableA的onSubscribe里的call方法。就是执行基本源码的这两句:1
2subscriber.onNext("hello rxjava");
subscriber.onCompleted();
也是根据上一章讲的RxJava基本调用(所以基本调用的逻辑很重要啊),这里事件开始下发了。下一步会执行ObservableA.subscribe(SubscriberB)
中SubscriberB的onNext()和onCompleted(),也就是MapSubscriber。
MapSubscriber的源码在刚才的OnSubscribeMap里,它的onNext()里主要执行就下面两句:1
2result = mapper.call(t);
actual.onNext(result);
这里mapper就是变换函数,通过MapSubscriber构造器向上找,发现就是OnSubscribeMap的 transformer,这个transformer也是OnSubscribeMap构造器传入的。再往上看,豁然开朗:
就是map()里传的Func1,mapper.call也就是执行我们.map()里自己写的回调:1
2
3
4
5
6.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
})
这里其实就是实现我们自己定义的变换了。我们定义的变换函数将String中的所有小写转换成了大写。下一步会执行actual.onNext(result),这里就不再一层层的找了,这个actual其实就是o(即SubscriberC)。在SubscriberC的 onNext中我们输出了接收到的result,也就是变换过后的String。
同样在调用MapSubscriber.onCompleted()时会执行subscriberOne.onCompleted()。这样就完成了一整套的调用流程。
终于到最后总结了,想把转换说明白还是挺费劲的。前面没看懂的话可以看下下面总结出的调用逻辑(整合了基本调用的逻辑):
第一步:创建ObservableA
第二步:在调用.map()时创建ObservableB
第三步:执行ObservableB.subscribe(SubscriberC)
第四步:调用ObservableB的call方法
第五步:在ObservableB的call方法里创建了SubscriberB,并调用了ObservableA.subscribe(SubscriberB)
第六步:调用ObservableA里onSubscribe里的call,事件从A开始下发
第七步:SubscriberB的onNext接收事件,并调用map()里的转换方法,事件继续下发
第八步:SubscriberC的onNext接收转换后的事件。
至此,从被观察者A到观察者C的事件流就完成了。中间的转换可以简单理解成经历了一个B,B对A和C分别建立了观察和被观察的关系。在B中实现了事件流的转换。从A和C的一层订阅变成了ABC的两层订阅。
看下扔物线大神的这张图可能会更好理解我所说的两层订阅:
张磊大神的这张图则可以很清楚的理解调用逻辑:
4. 总结
到这里,map()转换原理就讲解完了,其实map()的原理本质是lift()。只不过map()这个操作符举例比较容易,逻辑也相对简单。有兴趣的工友可以看下扔物线大神的《给 Android 开发者的 RxJava 详解》里对lift()原理的解释,你会发现和map()实现是一样的。
下一章节,会基于这个变换原理,来讲解线程调度的源码分析,调用逻辑和map很相似。建议大家先把转换原理搞明白。大家加油!