一起读RxJava源码(五)——线程调度

  经过前面四篇文章,我们已经了解了RxJava相关的一些基础。接下来,我们将进入RxJava最核心的内容—-线程调度。从Rx官网介绍来看,ReactiveX系列实现的核心功能就是异步编程,而程序层面中实现异步的核心是多线程。本篇文章内容稍长,如果稍有困惑可直接跳到文章末尾看流程图,可能会方便理解。

版权声明:本文为博主原创文章,未经博主允许不得转载。

一起读RxJava源码系列:
一起读RxJava源码(一)——简介
一起读RxJava源码(二)——基础知识:观察者模式
一起读RxJava源码(三)——RxJava的基本实现
一起读RxJava源码(四)——转换操作符
一起读RxJava源码(五)——线程调度
一起读RxJava源码(六)——深入浅出:基于波浪事件流和模块化的思路分析RxJava

1. 线程调度的核心:Scheduler

  Scheduler本身就是调度程序、调度器的意思。这里把它称为“核心”是因为RxJava实现线程调度主要方式是将Scheduler作为参数传入相关操作符中实现的,比如一会要讲的SubscribeOnObserveOn。而在Rx官网介绍中,Scheduler本身就能实现多线程的相关功能。下面我们一起来看下。

1.1 Scheduler介绍

  Scheduler是线程控制器,RxJava通过它来指定每一段代码应该运行在什么样的线程。RxJava已经内置了几个Scheduler,它们已经适合大多数的使用场景:

  • Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
  • Schedulers.newThread():总是启用新线程,并在新线程执行操作。
  • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
  • Schedulers.computation():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为CPU核数。不要把I/O操作放在computation()中,否则I/O操作的等待时间会浪费CPU。
  • AndroidSchedulers.mainThread():Android专用,它指定的操作将在Android主线程运行。

  RxJava最常规的线程控制就是通过这几个Scheduler配合subscribeOn()observeOn()两个方法来对线程进行控制。

1.2 单独使用Scheduler

  除了将Scheduler传递给RxJava运算符之外,我们还可以单独使用Scheduler来安排自己的订阅工作。可以使用RxJava的Scheduler.Worker类实现Java中利用Thread实现的多线程功能:

Scheduler实现多线程
1
2
3
4
5
6
7
8
9
10
11
Scheduler.Worker worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

@Override
public void call() {
yourWork();
}

});
// some time later...
worker.unsubscribe();

  在执行worker.schedule时会在新线程中执行call()中的代码。并可以通过worker.unsubscribe()终止线程。

  之外,还可以实现延迟执行和周期执行:

  • 延时500ms后执行

    1
    someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
  • 延时500ms后,每隔250ms执行一次

    1
    someScheduler.schedulePeriodically(someAction,500,250,TimeUnit.MILLISECONDS);

2. 线程调度的基本实现

  上一小节中提到过,RxJava实现线程控制主要是通过Scheduler配合subscribeOn()observeOn()两个操作符来实现。下面简单介绍下这两个操作符的功能:

subscribeOn(): 指定subscribe()所发生的线程,一方面是Observable.OnSubscribe被激活时所处的线程,或者叫做事件产生的线程。另一方面是观察者接收前的所处的线程,或者称为事件下发的线程。
observeOn(): 指定Subscriber观察者所运行在的线程。或者叫做事件消费的线程。

  定义如果没看太明白的话可以结合接下来的示例,下面是线程调度最基本的实现方式:

线程调度基本实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello rxjava");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("tag", "onCompleted");
}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i("tag", "onNext=" + s);
textView.setText(s);
}
});

  跟前几篇文章中基本方式比较,仅仅是多了.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())两句。
  在上面这段代码中,由于在create()后指定了subscribeOn(Schedulers.io()),Observable.OnSubscribe中call回调中的内容会在IO线程执行,即内容为”hello rxjava”的事件会在IO线程发出。而由于在subscribe()之前observeOn(AndroidScheculers.mainThread())的指定,使得Subscriber中给textView赋值的操作(UI的操作)会在主线程中执行。

  通过对上面例子的解释可以看出,RxJava切换线程仅仅通过添加subscribeOn和observeOn操作符就能实现,并且可以通过多次调用observeOn实现多次切换线程。
  能如此方便的切换线程,简直业界良心啊有木有。对于一名android搬砖工而言,可以不用纠结AsyncTask、线程和handler了。使用RxJava仅仅通过两个操作符就能实现线程间的切换、调度、可控,并大大减少代码量,增加可读性……

  当然,到此仅仅是介绍了RxJava实现线程调度的基本操作,下面我们就来分析分析源码,看看是怎么实现的。

3. 线程调度的源码解析——基于事件流

  下面的讲解还是以线程调度的基本实现为例,但是这回我们讲解的顺序不按照绝大部分博客的思路——按代码顺序从上至下的方式进行。因为经过前两篇文章对基础源码的分析,我们发现RxJava在执行到subscribe()才会去产生事件并下发,所以这里我们以事件的产生到接收流程为顺序进行源码逻辑的分析,个人认为会更好理解一点。请再回顾一下刚才线程调度基本实现的代码,找到建立依赖关系的地方。

3.1 observeOn()源码解析(一)

  先来分析下在subscribe()方法之前调用的observeOn()。这里是建立关系的开始,也是事件的开始。
  先来看下observeOn()源码:

observeOn()源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
……
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
……
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

  经历了层层调用后我们终于看到了observeOn()核心的东西,它调用了一个lift()方法。在上一篇文章最后我简单的介绍了一下:map原理的本质是lift。来看下lift()的源码:

lift()源码
1
2
3
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

  可以看到和map()是几乎一模一样了,所以和map原理一样,lift本质也是创建了一个新的Observable。跟前面的文章一样,这里记为ObservableC……为什么是C,因为B在下一小节。这个ObservableC传入的OnSubscribe是一个OnSubscribeLift。下面就来看下这个OnSubscribeLift:

OnSubscribeLift源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

final OnSubscribe<T> parent;

final Operator<? extends R, ? super T> operator;

public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}

@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {

Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);

o.onError(e);
}
}
}

  所以综上,当程序执行到observeOn(AndroidSchedulers.mainThread()).subscribe(),实际本质是在执行ObservableC.subscribe(new SubscriberD)。以后的逻辑就不多说了,相信看了前面两篇文章到这里基础应该十分扎实了。看下ObservableC——OnSubscribeLift的call()方法,RxJavaHooks.onObservableLift(operator)实际上就是将传入的operator返回了。并且调用了operator.call。这里的operator实际上是lift传入的参数OperatorObserveOn。下面来看下OperatorObserveOn.call:

OperatorObserveOn的call方法
1
2
3
4
5
6
7
8
public Subscriber<? super T> call(Subscriber<? super T> child) { 
...
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);

parent.init();

return parent;
}

  也是跟map一样,初始化了一个SubscriberC——ObserveOnSubscriber并调用init进行了初始化。下一步,回到OnSubscribeLift.Call中继续执行parent.call(st),也是就是调用了前一级Observable对应OnSubscriber.call(st)。注意这里和map有一点形式上的不一样,说是形式上是因为map这里是这样的:source.unsafeSubscribe(parent),形式上是将上一级的Observable和本级的MapSubscriber建立了观察依赖,但实际起作用的其实是建立依赖后调用上一级的Observable的OnSubscriber.call,这里和observeOn()其实本质又一样了
因此,这里也可以看成执行了下面一句:

1
ObservableB.subscribe(SubscriberC)

于是,根据基本调用逻辑,事件又往上一级ObservableB去了。

3.2 subscribeOn()源码解析(一)

  在线程调度的基本实现的例子中,在observeOn()方法之前调用的就是subscribeOn(Schedulers.io()),所以这一小节,我们来看下subscribeOn()的源码。

subscribeOn()源码1
1
2
3
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}

  subscribeOn()调用了一个两个参数的subscribeOn方法,我们继续:

subscribeOn()源码2
1
2
3
4
5
6
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

  重点在最后一句,如果你看了上一篇map()的源码解析,应该有印象:unsafeCreate()方法会创建一个新的被观察者Observable。由此可以预见,subscribeOn()的执行流程应该和map()差不多。这里会创建一个新的被观察者,记为ObservableB。传入的对应的OnSubscribe就是OperatorSubscribeOn。

OperatorSubscribeOn源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();

SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);

inner.schedule(parent);
}

static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {

final Subscriber<? super T> actual;
final boolean requestOn;
final Worker worker;
Observable<T> source;
Thread t;

SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}

@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}

@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
}

  通过上一小节我们知道事件经过observeOn后,会先调用ObservableB的OnSubscribe的call方法,这里即为OperatorSubscribeOn的call方法。

先简单过一下OperatorSubscribeOn的call中的代码的大概流程:首先通过scheduler.createWorker()构建了一个Worker;然后用传进来的subscriber构造了一个新的SubscribeOnSubscriber,记为subscriberB,并将subscriberB丢到Worker.schedule()创建的新线程来处理;新线程中,Worker.schedule(subscriberB)会执行subscriberB中的call方法,在call中会用上一级Observable去订阅观察者subscriberB,即ObservableA.subscribe(subscriberB)。
提炼一下:切换线程,添加订阅ObservableA.subscribe(subscriberB)。

下面是详细的解析:
第一步:
final Worker inner = scheduler.createWorker();
  从上面的流程来看,这个Worker就是线程调度的关键,从前面1.2中的例子也能看出来,Worker本身就是用来创建新线程的。而在这个例子中,创建Worker的scheduler往上回溯去查找发现就是subscribeOn(Schedulers.io())中的Schedulers.io(),就是通过它创建了我们前面提到的Worker。所以下面来看看Schedulers.io()的实现:

Schedulers.io()源码
1
2
3
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

  RxJavaHooks.onIOScheduler和绝大多数RxJavaHooks里的方法一样,把传入的scheduler返回了。这里的scheduler传入了一个getInstance().ioScheduler,看来是一个单例类,去它的私有构造器中找到ioScheduler的初始化:

ioScheduler的初始化
1
2
3
4
5
6
7
8
9
10
11
private Schedulers() { 
...
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
}
else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
...
}

继续看RxJavaSchedulersHook.createIoScheduler():

createIoScheduler()源码
1
2
3
4
5
6
7
8
9
10
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}

public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}

  可以看到返回了一个CachedThreadScheduler。也就是说subscribeOn(Schedulers.io())中传入的scheduler实际是这个CachedThreadScheduler。Worker也是CachedThreadScheduler的createWorker()方法创建的:

createWorker()源码
1
2
3
4
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}

  这里返回了EventLoopWorker,OperatorSubscribeOn的call中第一句话的inner引用持有的Worker。用了这么长的篇幅终于把第一句解析完了,可见理解线程调度还是有一定难度的。下面的工作应该就是根据Worker创建线程了。

第二步:
SubscribeOnSubscriber parent = new SubscribeOnSubscriber(subscriber, requestOn, inner, source);
  继续到OperatorSubscribeOn源码解析上来。第二步就是普通的基本调用的套路:创建出subscriberB。具体就不解释了,可以参考map的讲解来看。

第三步:
inner.schedule(parent);
  第一步我们知道inner引用的对象实际是EventLoopWorker,来看下EventLoopWorker的schedule方法:

EventLoopWorker的schedule源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}

@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
...
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}

  看到它调用了ThreadWorker的scheduleActual()方法。这里的ThreadWorker引用的对象是NewThreadWorker,下面是它的scheduleActual():

NewThreadWorker的scheduleActual()源码
1
2
3
4
5
6
7
8
9
10
11
12
13
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

  scheduleActual()中的ScheduledAction实现了Runnable接口,通过线程池executor切换了线程。顺便一提,可以在NewThreadWorker构造器中找到executor的初始化:

NewThreadWorker源码
1
2
3
4
5
6
7
8
9
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);

boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}

  可以看到这里是一个容量为1的newScheduledThreadPool。

这里多说一句,这里只是说明NewThreadWorker是容量为1的newScheduledThreadPool。Schedulers.io()——也就是CachedThreadScheduler(threadFactory),这个类维护了一个AtomicReference pool集合,集合中的每个元素都是一个NewThreadWorker。可以理解为线程池的缓存。

  再回到EventLoopWorker的schedule方法中,我们知道threadWorker.scheduleActual启动了一个新的线程,线程会执行scheduleActual第一个参数——Action0的call方法。而这个call方法中执行了一行:action.call();

  往上层代码中查找发现这里的action就是subscriberB——SubscribeOnSubscriber。因此在SubscribeOnSubscriber代码中可以看到除了OnNext等方法之外还有一个call方法,而正常的subscriber是不应该有call方法的。这个call方法也是因为SubscribeOnSubscriber实现了Action0接口复写的。所以在action.call()中调用的就是SubscribeOnSubscriber里的call。具体代码请自行往上翻……
  在这个call中执行了src.unsafeSubscribe(this)。是的,就是ObservableA.subscribe(subscriberB)。

  而ObservableA其实就是Observable.create(),也就是我们自己定义的被观察者。根据基本调用,会执行ObservableA中OnSubscribe的call方法,接着执行subscriberB.onNext(“hello rxjava”)。

所以,subscribeOn是先切换了线程,再去启动事件的,并且从启动事件开始,直到再次切换线程以前,都是在这个新线程中执行的。

  下一步,来看下subscriberB——SubscribeOnSubscriber如何接收的。

3.3 subscribeOn()源码解析(二)

  来看下subscribeOn()中创建的SubscribeOnSubscriber中接收事件的onNext方法:

SubscribeOnSubscriber的onNext源码
1
2
3
4
@Override
public void onNext(T t) {
actual.onNext(t);
}

  好惊喜有木有,就TM一句啊……这里actual是OperatorSubscribeOn中call方法带入的,也就是subscriberC——在ObservableB.subscribe(SubscriberC)的时候传入的。
  所以这里事件没有操作直接传往SubscriberC的onNext了……这甩锅甩的够快的……

  下面该进入SubscriberC的onNext了。

3.3 observeOn()源码解析(二)

  SubscriberC是在observeOn()中初始化的ObserveOnSubscriber,来看下源码:

ObserveOnSubscriber源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
……
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;

final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;

for (;;) {
long requestAmount = requested.get();

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(NotificationLite.<T>getValue(v));

currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}

}

  在ObserveOnSubscriber中执行了schedule(),而schedule()中执行了recursiveScheduler.schedule(this)
  来看下recursiveScheduler,在ObserveOnSubscriber构造器中有这么一句:this.recursiveScheduler = scheduler.createWorker();
  看到熟悉的createWorker()以及recursiveScheduler.schedule就知道这里要切换线程了。我们先等等,先看看这个scheduler引用的是啥。一层一层往上查找,发现就是observeOn(AndroidSchedulers.mainThread())传的AndroidSchedulers.mainThread()。再来看下这个方法:

ObserveOnSubscriber源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();

Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}

/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}

  所以执行scheduler.createWorker()的是LooperScheduler,而这个LooperScheduler(Looper.getMainLooper())传的参数是不是很熟悉?Looper.getMainLooper()获取主线程的Looper。

  这里回到ObserveOnSubscriber中来,recursiveScheduler.schedule(this)切换到了主线程并执行this——也就是ObserveOnSubscriber的call方法。
call方法中有两句localChild.onNext(NotificationLite.<T>getValue(v));checkTerminated(done, empty, localChild, q):就是调用SubscriberD的onNext()、onCompleted()、onError()。

也就是说,observeOn是在之前的线程中接受事件,然后切换线程,这个例子是切换到主线程,然后在新线程(主线程)中下发事件给下一级的观察者。

  到这里,线程调度的主要代码就介绍完了,下面看下整体流程。

4. 线程调度整体流程

  请回忆线程调度基本实现代码,然后再来看下面的流程。

第一步:Observable.create,创建被观察者,记为ObservableA
第二步:.subscribeOn(Schedulers.io()),创建新被观察者,记为ObservableB
第三步:.observeOn(AndroidSchedulers.mainThread()),创建新被观察者,记为ObservableC
第四步:.subscribe(new Subscriber()),创建观察者,记为SubscriberD,并建立依赖关系:ObservableC.subscribe(SubscriberD)

第五步:创建新观察者ObserveOnSubscriber,记为SubscriberC。并建立新依赖关系ObservableB.subscribe(SubscriberC)
第六步:通过制定Scheduler创建Worker;创建新观察者SubscribeOnSubscriber,记为SubscriberB;worker.schedule()切换新线程;在新线程中执行ObservableA.subscribe(SubscriberB)

第七步:ObservableA中OnSubscribe的call开始发送事件”hello rxjava”

第八步:SubscriberB接到事件后,直接甩给SubscriberC
第九步:SubscriberC接收事件;通过Worker切换到主线程;在主线程中将事件传递给SubscriberD
第十步:SubscriberD接收事件,整体流程结束

根据流程我们可以简单总结为:subscribeOn在事件上行时切换线程,而observeOn是在事件下发时切换线程。
自己画了张图,加深理解:

5. 总结

  线程调度到这里就告一段落了,我讲的也是寥寥数语,只是把大体流程介绍了一遍。如果想仔细搞懂每一步的细节,还需要在多线程以及handler机制上下功夫。
  我们经过这几篇的介绍,RxJava源码主要的逻辑基本介绍完了。下一章会介绍一些较为轻松的RxJava的具体应用,敬请期待!

Powered by Hexo and Hexo-theme-hiker

Copyright © 2018 - 2023 TEN-Z'S BLOG All Rights Reserved.

访客数 : | 访问量 :