RxJava2.x 订阅流程解析

RxJava2.x 订阅流程解析

前言

RxJava + Retrofit + okhttp 的网络请求框架已经成为 Android 开发的绝对主流框架,其带来的简洁、方便的调用形式早已征服了每个 Android 开发者,那么我们完全有必要去深入研究其实现原理。

文章的初衷就是希望能给你带来 RxJava2.x 的订阅流程更深入的解析。

相关概念

  • ReactiveX 是一个基于一系列可观察的异步和基础事件(composing asynchronous and event-based programs)编程组成的一个库,而 RxJava 则是这个库基于 Java VM 实现的,当然还有 RxJs,RxKotlin 等基于其他语言实现的库。RxJava2 则是为了遵循 Reactive-Streams specification 规范进行了完全的重写,而且无法与 RxJava 共存,但其实现原理都是相同的,所以本文将通过 RxJava2 的源码来进行深入解读。
  • 观察者模式 是一种运用十分广泛的设计模式,它有个两个概念观察者及被观察者,观察对象通过注册或者订阅被观察对象建立两者的联系,那么被观察的对象发生的特定行为会及时通知观察对象进行相应。在 Android 开发中 View 的 OnClickListener 就是典型的观察者模式,被观察者目标view 的点击事件触发会触发观察者 OnClickListener 的 onClick 方法响应。

订阅流程

已 RxJava2.x 为例,下面是一个简单的 RxJava 订阅流程,我们忽略 RxJava2.x 因背压处理的新观察者模型,就以 RxJava1.x 的观察者模型 Observable ( 被观察者 ) / Observer ( 观察者 ) 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}) // 观察者订阅被观察者
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Integer integer) {
System.out.println("Observer.onNext: " + integer);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("Observer.onComplete");
}
});

这是最简单的 RxJava2.x 订阅流程,Observable.create() 方法创建了一个被观察者对象,由 ObservableOnSubscribe 对象作为事件源,而 Observer 作为观察者通过 subscribe() 方法订阅注册到被观察者,此时事件源发出的事件将会被 Observer 接受输出。

订阅流程解析

我们知道订阅是由 Observable.subscribe 方法触发

Observable.subscribe 方法

1
2
3
4
5
6
public final void subscribe(Observer<? super T> observer) {
...
// 核心的订阅方法
subscribeActual(observer);
...
}

去除一些检验包装代码后,订阅事件的核心是由 Observable.subscribeActual 方法完成的,而这个方法是一个抽象方法,具体逻辑则是由子类实现,那我们就进入该流程中的 Observable.create 方法创建出的子类 ObservableCreate 中的 subscribeActual 具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将 observer 封装成 CreateEmitter(一个继承了ObservableEmitter接口的类)
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 所以 observer.onSubscribe 方法是最先调用的
observer.onSubscribe(parent);
try {
// 触发了事件源对应的方法,同时方法里面会有对应观察者方法的调用,比如 emitter.onNext(1)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

通过子类实现的订阅方法完成了如下逻辑,当被观察者被订阅时,会触发被观察者内部封装事件源的相关方法,然后再触发观察者的监听方法

RxJava 可以通过插入操作符对事件流进行操作,那么有操作符的订阅流程是怎么样的呢

1
2
3
4
5
6
7
8
... 被观察者
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer + 1;
}
}) // 返回的是一个 Observable 对象
... 观察者

Observable.map 方法

1
2
3
4
5
6
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
// 非空检验,RxJava2 中不允许空值
ObjectHelper.requireNonNull(mapper, "mapper is null");
// RxJavaPlugins.onAssembly 只是一个预埋的钩子函数,可以自定义一些额外的类似于监控的操作
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

最简单的操作符 map 返回的则是一个 ObservableMap 对象,我们需要去查找 ObservableMap.subscribeActual 方法

1
2
3
4
@Override
public void subscribeActual(v<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}

其中 source 是在构造函数中传入的 this 对象,即上一个 Observable 对象,在 map 操作中会生成一个新的 Observable,在 subscribeActual 实现方法中会对上一个 Observable 进行订阅。同时,会对 Observer 也会进行包装,为的是真正的数据转化工作

用图的方式更直白一些

订阅流程就比较直观地展示,只有在 Observable 被订阅时才会向上订阅一直到事件源,同时会根据操作符类型对 Observer 做一些转换封装,事件源调用后,事件会通过 Observer 层层调用最终调用到最初订阅时传入的 Observer

结语

虽然只是通过一个简单的流程分析了订阅流程,但其他的模型也是类似的,只不过可能其他类型的 Observable 可能实现了更为复杂的控制流程,譬如说实现一些缓存队列来保证数据的缓存。最终通过操作符的转换形成一条调用链,当被订阅时会沿着链子向上调用,然后事件向下回调传递

接下去,我会分析 RxJava2.x 另外一个重要特性线程切换,并简单讲述一些系统提供的 Schedulers 特性和实现原理