——线程调度/fangqi.jpg)
经过前面四篇文章,我们已经了解了RxJava相关的一些基础。接下来,我们将进入RxJava最核心的内容—-线程调度。从Rx官网介绍来看,ReactiveX系列实现的核心功能就是异步编程,而程序层面中实现异步的核心是多线程。本篇文章内容稍长,如果稍有困惑可直接跳到文章末尾看流程图,可能会方便理解。
版权声明:本文为博主原创文章,未经博主允许不得转载。
一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava
1. 线程调度的核心:Scheduler
Scheduler本身就是调度程序、调度器的意思。这里把它称为“核心”是因为RxJava实现线程调度主要方式是将Scheduler作为参数传入相关操作符中实现的,比如一会要讲的SubscribeOn
和ObserveOn
。而在Rx官网介绍中,Scheduler本身就能实现多线程的相关功能。下面我们一起来看下。
1.1 Scheduler介绍
Scheduler是线程控制器,RxJava通过它来指定每一段代码应该运行在什么样的线程。RxJava已经内置了几个Scheduler,它们已经适合大多数的使用场景:
- Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
- Schedulers.newThread():总是启用新线程,并在新线程执行操作。
- Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
- Schedulers.computation():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
- AndroidSchedulers.mainThread():Android专用,它指定的操作将在Android主线程运行。
RxJava最常规的线程控制就是通过这几个Scheduler配合subscribeOn()
和observeOn()
两个方法来对线程进行控制。
1.2 单独使用Scheduler
除了将Scheduler传递给RxJava运算符之外,我们还可以单独使用Scheduler来安排自己的订阅工作。可以使用RxJava的Scheduler.Worker类实现Java中利用Thread实现的多线程功能:1
2
3
4
5
6
7
8
9
10
11Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
在执行worker.schedule时会在新线程中执行call()中的代码。并可以通过worker.unsubscribe()
终止线程。
之外,还可以实现延迟执行和周期执行:
延时500ms后执行
1 someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);延时500ms后,每隔250ms执行一次
1 someScheduler.schedulePeriodically(someAction,500,250,TimeUnit.MILLISECONDS);
2. 线程调度的基本实现
上一小节中提到过,RxJava实现线程控制主要是通过Scheduler配合subscribeOn()
和observeOn()
两个操作符来实现。下面简单介绍下这两个操作符的功能:
subscribeOn(): 指定subscribe()所发生的线程,一方面是Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。另一方面是观察者接收前的所处的线程,或者称为事件下发的线程。
observeOn(): 指定Subscriber观察者所运行在的线程。或者叫做事件消费的线程。
定义如果没看太明白的话可以结合接下来的示例,下面是线程调度最基本的实现方式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("tag", "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
textView.setText(s);
}
});
跟前几篇文章中基本方式比较,仅仅是多了.subscribeOn(Schedulers.io())
和.observeOn(AndroidSchedulers.mainThread())
两句。
在上面这段代码中,由于在create()后指定了subscribeOn(Schedulers.io()),Observable.OnSubscribe中call回调中的内容会在IO线程执行,即内容为”hello rxjava”的事件会在IO线程发出。而由于在subscribe()之前observeOn(AndroidScheculers.mainThread())的指定,使得Subscriber中给textView赋值的操作(UI的操作)会在主线程中执行。
通过对上面例子的解释可以看出,RxJava切换线程仅仅通过添加subscribeOn和observeOn操作符就能实现,并且可以通过多次调用observeOn实现多次切换线程。
能如此方便的切换线程,简直业界良心啊有木有。对于一名android搬砖工而言,可以不用纠结AsyncTask、线程和handler了。使用RxJava仅仅通过两个操作符就能实现线程间的切换、调度、可控,并大大减少代码量,增加可读性……
当然,到此仅仅是介绍了RxJava实现线程调度的基本操作,下面我们就来分析分析源码,看看是怎么实现的。
3. 线程调度的源码解析——基于事件流
下面的讲解还是以线程调度的基本实现为例,但是这回我们讲解的顺序不按照绝大部分博客的思路——按代码顺序从上至下的方式进行。因为经过前两篇文章对基础源码的分析,我们发现RxJava在执行到subscribe()才会去产生事件并下发,所以这里我们以事件的产生到接收流程为顺序进行源码逻辑的分析,个人认为会更好理解一点。请再回顾一下刚才线程调度基本实现的代码,找到建立依赖关系的地方。
3.1 observeOn()源码解析(一)
先来分析下在subscribe()方法之前调用的observeOn()。这里是建立关系的开始,也是事件的开始。
先来看下observeOn()源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
……
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
……
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
经历了层层调用后我们终于看到了observeOn()核心的东西,它调用了一个lift()方法。在上一篇文章最后我简单的介绍了一下:map原理的本质是lift。来看下lift()的源码:1
2
3public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
可以看到和map()是几乎一模一样了,所以和map原理一样,lift本质也是创建了一个新的Observable。跟前面的文章一样,这里记为ObservableC……为什么是C,因为B在下一小节。这个ObservableC传入的OnSubscribe是一个OnSubscribeLift。下面就来看下这个OnSubscribeLift:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
}
所以综上,当程序执行到observeOn(AndroidSchedulers.mainThread()).subscribe()
,实际本质是在执行ObservableC.subscribe(new SubscriberD)。以后的逻辑就不多说了,相信看了前面两篇文章到这里基础应该十分扎实了。看下ObservableC——OnSubscribeLift的call()方法,RxJavaHooks.onObservableLift(operator)
实际上就是将传入的operator返回了。并且调用了operator.call。这里的operator实际上是lift传入的参数OperatorObserveOn。下面来看下OperatorObserveOn.call:1
2
3
4
5
6
7
8public Subscriber<? super T> call(Subscriber<? super T> child) {
...
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
也是跟map一样,初始化了一个SubscriberC——ObserveOnSubscriber并调用init进行了初始化。下一步,回到OnSubscribeLift.Call中继续执行parent.call(st),也是就是调用了前一级Observable对应OnSubscriber.call(st)。注意这里和map有一点形式上的不一样,说是形式上是因为map这里是这样的:source.unsafeSubscribe(parent),形式上是将上一级的Observable和本级的MapSubscriber建立了观察依赖,但实际起作用的其实是建立依赖后调用上一级的Observable的OnSubscriber.call,这里和observeOn()其实本质又一样了。
因此,这里也可以看成执行了下面一句:1
ObservableB.subscribe(SubscriberC)
于是,根据基本调用逻辑,事件又往上一级ObservableB去了。
3.2 subscribeOn()源码解析(一)
在线程调度的基本实现的例子中,在observeOn()方法之前调用的就是subscribeOn(Schedulers.io()),所以这一小节,我们来看下subscribeOn()的源码。1
2
3public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}
subscribeOn()调用了一个两个参数的subscribeOn方法,我们继续:1
2
3
4
5
6public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
重点在最后一句,如果你看了上一篇map()的源码解析,应该有印象:unsafeCreate()方法会创建一个新的被观察者Observable。由此可以预见,subscribeOn()的执行流程应该和map()差不多。这里会创建一个新的被观察者,记为ObservableB。传入的对应的OnSubscribe就是OperatorSubscribeOn。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> actual;
final boolean requestOn;
final Worker worker;
Observable<T> source;
Thread t;
SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}
@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
}
通过上一小节我们知道事件经过observeOn后,会先调用ObservableB的OnSubscribe的call方法,这里即为OperatorSubscribeOn的call方法。
先简单过一下OperatorSubscribeOn的call中的代码的大概流程:首先通过scheduler.createWorker()构建了一个Worker;然后用传进来的subscriber构造了一个新的SubscribeOnSubscriber,记为subscriberB,并将subscriberB丢到Worker.schedule()创建的新线程来处理;新线程中,Worker.schedule(subscriberB)会执行subscriberB中的call方法,在call中会用上一级Observable去订阅观察者subscriberB,即ObservableA.subscribe(subscriberB)。
提炼一下:切换线程,添加订阅ObservableA.subscribe(subscriberB)。
下面是详细的解析:
第一步:
final Worker inner = scheduler.createWorker();
从上面的流程来看,这个Worker就是线程调度的关键,从前面1.2中的例子也能看出来,Worker本身就是用来创建新线程的。而在这个例子中,创建Worker的scheduler往上回溯去查找发现就是subscribeOn(Schedulers.io())中的Schedulers.io(),就是通过它创建了我们前面提到的Worker。所以下面来看看Schedulers.io()的实现:1
2
3public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
RxJavaHooks.onIOScheduler和绝大多数RxJavaHooks里的方法一样,把传入的scheduler返回了。这里的scheduler传入了一个getInstance().ioScheduler
,看来是一个单例类,去它的私有构造器中找到ioScheduler的初始化:1
2
3
4
5
6
7
8
9
10
11private Schedulers() {
...
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
}
else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
...
}
继续看RxJavaSchedulersHook.createIoScheduler():1
2
3
4
5
6
7
8
9
10public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}
可以看到返回了一个CachedThreadScheduler。也就是说subscribeOn(Schedulers.io())中传入的scheduler实际是这个CachedThreadScheduler。Worker也是CachedThreadScheduler的createWorker()方法创建的:1
2
3
4@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
这里返回了EventLoopWorker,OperatorSubscribeOn的call中第一句话的inner引用持有的Worker。用了这么长的篇幅终于把第一句解析完了,可见理解线程调度还是有一定难度的。下面的工作应该就是根据Worker创建线程了。
第二步:
SubscribeOnSubscriber
继续到OperatorSubscribeOn源码解析上来。第二步就是普通的基本调用的套路:创建出subscriberB。具体就不解释了,可以参考map的讲解来看。
第三步:
inner.schedule(parent);
第一步我们知道inner引用的对象实际是EventLoopWorker,来看下EventLoopWorker的schedule方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
...
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
看到它调用了ThreadWorker的scheduleActual()方法。这里的ThreadWorker引用的对象是NewThreadWorker,下面是它的scheduleActual():1
2
3
4
5
6
7
8
9
10
11
12
13public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
scheduleActual()中的ScheduledAction实现了Runnable接口,通过线程池executor切换了线程。顺便一提,可以在NewThreadWorker构造器中找到executor的初始化:1
2
3
4
5
6
7
8
9public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
可以看到这里是一个容量为1的newScheduledThreadPool。
这里多说一句,这里只是说明NewThreadWorker是容量为1的newScheduledThreadPool。Schedulers.io()——也就是CachedThreadScheduler(threadFactory),这个类维护了一个AtomicReference
pool集合,集合中的每个元素都是一个NewThreadWorker。可以理解为线程池的缓存。
再回到EventLoopWorker的schedule方法中,我们知道threadWorker.scheduleActual启动了一个新的线程,线程会执行scheduleActual第一个参数——Action0的call方法。而这个call方法中执行了一行:action.call();
往上层代码中查找发现这里的action就是subscriberB——SubscribeOnSubscriber。因此在SubscribeOnSubscriber代码中可以看到除了OnNext等方法之外还有一个call方法,而正常的subscriber是不应该有call方法的。这个call方法也是因为SubscribeOnSubscriber实现了Action0接口复写的。所以在action.call()中调用的就是SubscribeOnSubscriber里的call。具体代码请自行往上翻……
在这个call中执行了src.unsafeSubscribe(this)
。是的,就是ObservableA.subscribe(subscriberB)。
而ObservableA其实就是Observable.create(),也就是我们自己定义的被观察者。根据基本调用,会执行ObservableA中OnSubscribe的call方法,接着执行subscriberB.onNext(“hello rxjava”)。
所以,subscribeOn是先切换了线程,再去启动事件的,并且从启动事件开始,直到再次切换线程以前,都是在这个新线程中执行的。
下一步,来看下subscriberB——SubscribeOnSubscriber如何接收的。
3.3 subscribeOn()源码解析(二)
来看下subscribeOn()中创建的SubscribeOnSubscriber中接收事件的onNext方法:1
2
3
4@Override
public void onNext(T t) {
actual.onNext(t);
}
——线程调度/jingdai.jpeg)
好惊喜有木有,就TM一句啊……这里actual是OperatorSubscribeOn中call方法带入的,也就是subscriberC——在ObservableB.subscribe(SubscriberC)的时候传入的。
所以这里事件没有操作直接传往SubscriberC的onNext了……这甩锅甩的够快的……
下面该进入SubscriberC的onNext了。
3.3 observeOn()源码解析(二)
SubscriberC是在observeOn()中初始化的ObserveOnSubscriber,来看下源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
……
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
}
在ObserveOnSubscriber中执行了schedule(),而schedule()中执行了recursiveScheduler.schedule(this)
。
来看下recursiveScheduler,在ObserveOnSubscriber构造器中有这么一句:this.recursiveScheduler = scheduler.createWorker();
看到熟悉的createWorker()以及recursiveScheduler.schedule就知道这里要切换线程了。我们先等等,先看看这个scheduler引用的是啥。一层一层往上查找,发现就是observeOn(AndroidSchedulers.mainThread())
传的AndroidSchedulers.mainThread()
。再来看下这个方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
所以执行scheduler.createWorker()的是LooperScheduler,而这个LooperScheduler(Looper.getMainLooper())传的参数是不是很熟悉?Looper.getMainLooper()获取主线程的Looper。
这里回到ObserveOnSubscriber中来,recursiveScheduler.schedule(this)切换到了主线程并执行this——也就是ObserveOnSubscriber的call方法。
call方法中有两句localChild.onNext(NotificationLite.<T>getValue(v));
和checkTerminated(done, empty, localChild, q)
:就是调用SubscriberD的onNext()、onCompleted()、onError()。
也就是说,observeOn是在之前的线程中接受事件,然后切换线程,这个例子是切换到主线程,然后在新线程(主线程)中下发事件给下一级的观察者。
到这里,线程调度的主要代码就介绍完了,下面看下整体流程。
4. 线程调度整体流程
请回忆线程调度基本实现代码,然后再来看下面的流程。
第一步:Observable.create,创建被观察者,记为ObservableA
第二步:.subscribeOn(Schedulers.io()),创建新被观察者,记为ObservableB
第三步:.observeOn(AndroidSchedulers.mainThread()),创建新被观察者,记为ObservableC
第四步:.subscribe(new Subscriber()),创建观察者,记为SubscriberD,并建立依赖关系:ObservableC.subscribe(SubscriberD) 第五步:创建新观察者ObserveOnSubscriber,记为SubscriberC。并建立新依赖关系ObservableB.subscribe(SubscriberC)
第六步:通过制定Scheduler创建Worker;创建新观察者SubscribeOnSubscriber,记为SubscriberB;worker.schedule()切换新线程;在新线程中执行ObservableA.subscribe(SubscriberB)第七步:ObservableA中OnSubscribe的call开始发送事件”hello rxjava”
第八步:SubscriberB接到事件后,直接甩给SubscriberC
第九步:SubscriberC接收事件;通过Worker切换到主线程;在主线程中将事件传递给SubscriberD
第十步:SubscriberD接收事件,整体流程结束
根据流程我们可以简单总结为:subscribeOn在事件上行时切换线程,而observeOn是在事件下发时切换线程。
自己画了张图,加深理解:
5. 总结
线程调度到这里就告一段落了,我讲的也是寥寥数语,只是把大体流程介绍了一遍。如果想仔细搞懂每一步的细节,还需要在多线程以及handler机制上下功夫。
我们经过这几篇的介绍,RxJava源码主要的逻辑基本介绍完了。下一章会介绍一些较为轻松的RxJava的具体应用,敬请期待!