一起读RxJava源码(四)——转换操作符

  上一篇我们介绍了RxJava最基本的实现方式以及源码逻辑,但是仅仅会基础实现方式是远远不够的。RxJava有着大量的各式各样的操作符(如上图谱所示)。也正是因为这些操作符,使得RxJava可以很简单且简洁的实现一些复杂的问题。这也是RxJava以至于Rx系列强大的原因之一。

版权声明:本文为博主原创文章,未经博主允许不得转载。

一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava

1. 前言

  上一篇我们介绍的RxJava最基本的实现方式虽然没有展现出RxJava的强大,但是却是一切的基础。没有看过的工友建议先看下,以免这一章节消化不了。

  RxJava的操作符有很多,上面的图谱里就能看出来,具体使用可以参考官网介绍。这里主要介绍的是转换操作符,它作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。直接看概念可能会不太好理解,这篇文章以相对简单且常用的转换操作符map为例,进行源码的分析。了解map操作符的逻辑之后可能再来看概念就容易理解得多了。

转换操作符实现的其实是“变换”的原理。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

——转换操作符原理

2. map操作符的简单应用

  先举个例子,将一组字符串中的小写字符转换成大写输出,用map可以这样实现:

map实现字符串中小写转大写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
List<String> froms = new ArrayList<>();
froms.add("hello");
froms.add("rxjava");
froms.add("毁我青春");
Observable.from(froms)
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
//System.out.println("onCompleted");
Log.i("tag", "onCompleted");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
}
});

输出的结果是这样的:

输出结果
1
2
3
4
onNext=HELLO
onNext=RXJAVA
onNext=毁我青春
onCompleted

  这里Observable.from()这个方法是将传入的数组或Iterable拆分成具体对象后,依次发送出来。和之前的之前的create(OnSubscribe)是等价的。

  可以看到,map()方法将每次接收到的String对象进行了toUpperCase()操作,经过map()方法后,Subscriber里onNext()收到的就是转换成大写的内容。当然,map()方法也可以进行其他类型的转换,比如将int转换成字符串,或是字符串转换成Bitmap对象等,在Func1类构造函数的泛型参数(new Func1<String, String>())就是转换前和转换后的对象类型。

  总结一下map()的用法:它是对事件对象一对一的直接变换,也是RxJava最常用的变换。如下图所示:

3. map操作符的源码解析

为了基于上一章节所讲的内容,我们把上面的例子变得简单一点:

RxJava基本实现+map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava");
subscriber.onCompleted();
}
}).map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("tag", "onCompleted");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
}
});

  由于上一章节讲解了Observable.create()和.subscribe()的源码,所以这里用了上一章节介绍的RxJava基本实现。from()的源码解析这里就不多赘述了,主要还是理解map的基本的逻辑。

  这里Observable.create()自然不必多说,最终返回了一个新创建的被观察者Observable。下一步会执行Observable.map()。

  map()的源码也在Observable类里:

map()源码
1
2
3
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}

再看下unsafeCreate()的类:

unsafeCreate()源码
1
2
3
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}

  是不是有点眼熟??因为跟Observable.create()的实现是一毛一样的啊!可以现在回到上一章节对比一下。也就是说,map()方法也是直接返回了一个新创建的被观察者Observable,这是一个新的被观察者,我们把它称为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。

  我们可以看到在ObservableB中,传入RxJavaHooks.onCreate(f)的参数可以追溯到map()方法里传入的new OnSubscribeMap(this, func)。所以,ObservableB对应的OnSubscribe就是这个OnSubscribeMap。而ObservableA和变换函数Func1则作为构造OnSubscribeMap的参数。

  现在我们现在把视野拉远,从整体再看下这个例子的逻辑:

  也就是说,在执行了map方法后,变成了ObservableB和Subscriber建立订阅关系。我们把最后的观察者也起个名子区分,叫SubscriberC。总结下,就是原本ObservableA.subscribe(SubscriberC)之间加了.map()后,现在变成了ObservableB.subscribe(SubscriberC)。

  根据上一篇文章介绍的RxJava基础流程,在ObservableB.subscribe(SubscriberC)建立订阅关系后,应该执行ObservableB的onSubscribe里的call方法。上面讲到了 ObservableB对应的OnSubscribe是OnSubscribeMap,我们来看一下OnSubscribeMap的源码:

OnSubscribeMap源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

final Observable<T> source;

final Func1<? super T, ? extends R> transformer;

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}

@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}

static final class MapSubscriber<T, R> extends Subscriber<T> {

final Subscriber<? super R> actual;

final Func1<? super T, ? extends R> mapper;

boolean done;

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}

@Override
public void onNext(T t) {
R result;

try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}

actual.onNext(result);
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;

actual.onError(e);
}


@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}

@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}

}

  OnSubscribeMap实现了OnSubscribe接口,因此OnSubscribeMap就是一个OnSubscribe。我们来看下OnSubscribeMap的call方法。首先通过我们的观察者o(即SubscriberC)和转换函数transformer构造了一个MapSubscriber。然后调用了source的unsafeSubscribe()。
  我们看下这个source是从OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer)这个构造器里传过来的。往上一级看,这个构造器是在ObservableA.map()方法里调用的(具体源码请往上翻到map()源码),这里source传入的就是ObservableA。

  所以这里小结下,OnSubscribeMap的call方法里本质是调用了ObservableA的unsafeSubscribe()。我们继续深入,看下unsafeSubscribe()源码:

unsafeSubscribe()源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {

subscriber.onStart();

RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {

Exceptions.throwIfFatal(e);

try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);

RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);

RxJavaHooks.onObservableError(r);

throw r; // NOPMD
}
return Subscriptions.unsubscribed();
}
}

  有没有很眼熟??可以返回上一篇文章里.subscribe()源码解析看一下,是的,你应该没有记错,这段代码跟.subscribe()的源码基本完全一样。核心的那句又出来了:

1
RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);

  也就是说,调用了ObservableA的onSubscribe里的call方法,也就是执行了ObservableA.subscribe(MapSubscriber)。而MapSubscriber其实是在OnSubscribeMap.call()中new出来的。可以理解成是属于B的。执行到这里B不再只是SubscriberC的被观察者了,他同时也是ObservableA的观察者,记为 SubscriberB。B = (ObservableB+SubscriberB)。

  所以到这里,我们可以这样理解:

第一步:ObservableB.subscribe(SubscriberC)
第二步:调用ObservableB的call方法
第三步:在ObservableB的call方法里创建了SubscriberB ,并调用了ObservableA.subscribe(SubscriberB)

  下一步跟之前以及RxJava基本调用逻辑一样,执行ObservableA的onSubscribe里的call方法。就是执行基本源码的这两句:

ObservableA的onSubscribe
1
2
subscriber.onNext("hello rxjava");
subscriber.onCompleted();

  也是根据上一章讲的RxJava基本调用(所以基本调用的逻辑很重要啊),这里事件开始下发了。下一步会执行ObservableA.subscribe(SubscriberB)中SubscriberB的onNext()和onCompleted(),也就是MapSubscriber。

  MapSubscriber的源码在刚才的OnSubscribeMap里,它的onNext()里主要执行就下面两句:

MapSubscriber的onNext()
1
2
result = mapper.call(t);
actual.onNext(result);

  这里mapper就是变换函数,通过MapSubscriber构造器向上找,发现就是OnSubscribeMap的 transformer,这个transformer也是OnSubscribeMap构造器传入的。再往上看,豁然开朗:
就是map()里传的Func1,mapper.call也就是执行我们.map()里自己写的回调:

mapper
1
2
3
4
5
6
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s.toUpperCase();
}
})

  这里其实就是实现我们自己定义的变换了。我们定义的变换函数将String中的所有小写转换成了大写。下一步会执行actual.onNext(result),这里就不再一层层的找了,这个actual其实就是o(即SubscriberC)。在SubscriberC的 onNext中我们输出了接收到的result,也就是变换过后的String。

  同样在调用MapSubscriber.onCompleted()时会执行subscriberOne.onCompleted()。这样就完成了一整套的调用流程。

  终于到最后总结了,想把转换说明白还是挺费劲的。前面没看懂的话可以看下下面总结出的调用逻辑(整合了基本调用的逻辑):

第一步:创建ObservableA
第二步:在调用.map()时创建ObservableB
第三步:执行ObservableB.subscribe(SubscriberC)
第四步:调用ObservableB的call方法
第五步:在ObservableB的call方法里创建了SubscriberB,并调用了ObservableA.subscribe(SubscriberB)
第六步:调用ObservableA里onSubscribe里的call,事件从A开始下发
第七步:SubscriberB的onNext接收事件,并调用map()里的转换方法,事件继续下发
第八步:SubscriberC的onNext接收转换后的事件。

  至此,从被观察者A到观察者C的事件流就完成了。中间的转换可以简单理解成经历了一个B,B对A和C分别建立了观察和被观察的关系。在B中实现了事件流的转换。从A和C的一层订阅变成了ABC的两层订阅。
  看下扔物线大神的这张图可能会更好理解我所说的两层订阅

  张磊大神的这张图则可以很清楚的理解调用逻辑:

4. 总结

  到这里,map()转换原理就讲解完了,其实map()的原理本质是lift()。只不过map()这个操作符举例比较容易,逻辑也相对简单。有兴趣的工友可以看下扔物线大神的《给 Android 开发者的 RxJava 详解》里对lift()原理的解释,你会发现和map()实现是一样的。
  下一章节,会基于这个变换原理,来讲解线程调度的源码分析,调用逻辑和map很相似。建议大家先把转换原理搞明白。大家加油!

Powered by Hexo and Hexo-theme-hiker

Copyright © 2018 - 2023 TEN-Z'S BLOG All Rights Reserved.

访客数 : | 访问量 :