——RxJava的基本实现/updown.jpeg)
前两篇我们介绍了RxJava的一些基本概念,以及通过源码了解了观察者模式这一基础知识。从这一章开始,我们将依次由浅入深的了解RxJava的主要操作以及源码的实现。
版权声明:本文为博主原创文章,未经博主允许不得转载。
一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava
1. 前言
在上一章节最开始我们提到过:RxJava的异步实现,是通过一种扩展的观察者模式来实现的。上一章节介绍了基础的观察者模式,这一章节我们一起来看一下RxJava是如何实现扩展的观察者模式的。
——RxJava的基本实现/jihui.jpg)
2. 扩展的观察者模式
2.1 RxJava的四个基本概念
Observable(可观察者,即被观察者)
Observer/Subscriber(观察者)
Subscribe(订阅)
事件(相当于观察者模式中被观察者用来通知观察者的notifyObservers()方法,类似分发的事件)
2.2 工作原理
Observable和Observer/Subscriber通过subscribe()方法实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer/Subscriber。
特别注意的是,与传统观察者模式不同,RxJava的Observer/Subscriber回调方法除了普通事件onNext()(相当于update)之外,还定义了两个特殊的事件:onCompleted()和onError()。
onCompleted():事件队列完结。RxJava不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的onNext()发出时,需要触发onCompleted()方法作为标志。
onError():事件队列异常。在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中,onCompleted()和onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()和onError()二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava的扩展的观察者模式大致如下图:
——RxJava的基本实现/RxJava观察者.jpg)
3. RxJava的基本实现
使用RxJava的基本实现操作拢共分三步:
第一步:初始化 Observable
第二步:初始化 Observer
第三步:建立订阅关系
基本调用完整起来是这样的:
请务必记住这个基本实现,在后面的源码讲解中还会回到这个基本实现代码上来。我会用“RxJava基本实现”加粗的方式表示需要回到这里看下面的基本实现代码,以便更好的理解源码。
1 | Observable.create(new Observable.OnSubscribe<String>() { |
和上一小节讲得工作原理一样,首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参;接着创建一个观察者Subscriber,然后通过subseribe()实现二者的订阅关系。
下面就是对这三步进行源码的分析,看看他们都干了什么。
3.1 Observable.create()源码解析
在上面的基本调用示例中,Observable.create()
使用是这样的:1
2
3
4
5
6
7Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("垃圾软件,毁我青春");
subscriber.onCompleted();
}
})
看下Observable.create()
这个方法的源码1
2
3public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
在执行Observable.create()
的时候,直接返回了一个新创建的被观察者Observable。同时将RxJavaHooks.onCreate(f)
作为构造函数的参数。进一步看一下Observable的构造函数:1
2
3protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
可以发现这里构造函数只是将传入的RxJavaHooks.onCreate(f)
参数赋值给了Observable类里的onSubscribe引用,构造函数就结束了。我们从RxJava基本实现代码中看到RxJavaHooks.onCreate(f)
里的f是Observable.create
里那个新创建的onSubscribe。那么下一步看看传入的参数RxJavaHooks.onCreate(f)做了什么。1
2
3
4
5
6
7public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
由于我们并没调用RxJavaHooks.initCreate()
,所以上面代码中的onObservableCreate
为null;于是代码会跳过if语句里的内容,继续执行下面的内容。因此RxJavaHooks.onCreate(f)
最终返回的就是f,也就是我们在Observable.create()
的时候new出来的OnSubscribe。
在这个基本的例子里,似乎RxJavaHooks.onCreate(f)
有点多余,因为对传入的f直接返回了,还多了一层调用。不过,因为这个例子里我们的onObservableCreate
为null,如果不为null,执行OnSubscribe的call回调(f.call),其实是直接通知事件了,因为在OnSubscribe的call方法里一般会执行subscriber.onNext()
,后面我们会看到类似的效果。
3.2 subscribe()源码解析
在示例代码RxJava基本实现中,执行完Observable.create()
方法之后,返回了一个Observable,下一步会执行.subscribe()方法(Observable.subscribe()
),相当于如下代码(这里的Observable是Observable.create()
返回的)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16Observable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
Log.d("tag", "onCompleted:");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.d("tag", "onNext:" + ((String)o).toString());
}
});
我们可以看到,在.subscribe()方法里传入个一个新创建的观察者Subscriber()。Subscriber()是实现Observer接口的一个抽象类,在初始化Subscriber()的时候实现了Observer接口的三个回调方法:onNext(),onCompleted(),onError()。
下面来看下.subscribe()里做了些什么。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
……
// new Subscriber so onStart it
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
…… }
return Subscriptions.unsubscribed();
}
}
看上去有点唬人,我里删减了下。最核心的其实是这句1
RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);
这里RxJavaHooks和之前提到的一样,返回的是第二个入参observable.onSubscribe
,也就是RxJava基本实现中observable.create()
里初始化的onSubscribe。 所以这段代码可以看成:1
onSubscribe.call(subscriber)
这里可以看到,onSubscribe里的call被调用了,这意味着事件发送的逻辑开始运行。因此在RxJava中,Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候。
我们继续看onSubscribe.call(subscriber)
的执行情况。在RxJava基本实现中onSubscribe在它的call方法实现里执行了如下两行代码:1
2subscriber.onNext("垃圾软件,毁我青春");
subscriber.onCompleted();
于是,接下来会将参数传给subscribe()方法里Subscriber对象的onNext(),onNext()接收参数并输出日志。然后执行subscriber的onCompleted(),原理同上。
最后,RxJava基本实现就完成了,结果如下图:
4. 总结
简单总结一下RxJava基本实现的调用逻辑:
- 初始化被观察者Observable
- 初始化观察者Subscriber
- 通过subscribe()建立订阅关系,并立即发送事件
- 执行onSubscribe里的call回调,发送事件
- 观察者Subscriber的相应回调接收事件参数,执行各自逻辑
至此,我们把RxJava的基本实现顺着源码的逻辑捋了一篇,这里用张磊大神的一张图来说明具体调用的逻辑:
我们发现RxJava首先是实现了一个订阅的形式,将观察者和被观察者解耦,这里和普通观察者还是很像的。同时,我们也发现Rxjava一但执行了订阅方法subscribe()就会立刻发送事件,它没有像普通观察者内部Vector的存储机制,也就无法解决一对多依赖的问题。
下一章节,我们会一起学习RxJava基本操作符–主要是变换操作符的一些源码和逻辑。在此之前,建议先把基础的逻辑看懂。加油!