RxJava2.x 线程切换解析
前言
之所以 RxJava 能在安卓开发中如此受欢迎,其中一个重要的原因就是在于 RxJava 切换线程的便利性特别适用于 Android 中主线程不能执行耗时操作的规定,配合 Retrofit 适用能对于网络 IO 操作提供非常大的便利。
这篇文章的目的就是深入解析RxJava 中关于线程切换的原理。在阅读下文前,你可以先阅读《RxJava2.x 订阅流程解析 》一文作为基础。
线程切换流程
RxJava2.x 中线程切换方法和 RxJava1.x 中没有区别,主要通过 Observable.subscribeOn 和 Observable.observeOn 两个方法实现,前者主要是规定事件源代码执行的线程,而后者则是规定下一个操作符中代码执行的线程,我们已一个简单的流程为例
1 | Observable |
输出结果中,被观察者事件源代码执行在 RxCachedThreadScheduler-1 即 Schedulers.io() 线程中,观察者代码执行在了 RxComputationThreadPool-1 即 Schedulers.computation() 线程中
值得注意的是 subscribeOn 方法只在第一次执行时有效,就是说被观察者只会跑在第一次 subscribeOn 规定的线程上(没有就是当前线程),至于看完后面就能知道,我们先从 observeOn 方法开始
observeOn 流程解析
Observable.observeOn 方法
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
由此可以得知,切换线程的关键就在于 ObservableObserveOn 的 subscribeActual 方法,而且由于 observeOn 是作用于观察者的回调方法,我们可以得知肯定是对 observer 做了某种程度的封装,使其可以切换线程执行回调
ObservableObserveOn.subscribeActual 方法
1 |
|
我们可以看到方法内确实对出传入的 observer 进行了封装,那我们可以深入的去查看 ObserveOnObserver 中继承自 Observer 接口的方法,以 onNext 的实现为例
1 |
|
切换线程交由 worker 去执行,而 worker 是有调度器scheduler.createWorker 方法创建的,其实只是让 worker 去执行一个异步任务(具体流程会在后面解析 Schedulers 中的默认调度器实现时提及),而 ObserveOnObserver 也实现了 runnable 接口,所以后续逻辑需要到 run 方法中去寻找
ObserveOnObserver.run 方法
1 |
|
这流程就比较清晰了,在 ObserveOnObserver 封装的观察者中,会以队列的形式存储上游回调来的数据,再以调度器生成的 worker 做线程切换的动作,在线程上执行时从队列中取数据再交由下游的观察者继续执行回调,从而完成流程执行过程中的线程切换
subscribeOn 流程解析
那回过头来看 subscribeOn 方法,该方法的切换时机肯定是在 subscribeActual 方法执行的时候了
Observable.subscribeOn 方法
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
同理可以,具体的流程应该在 ObservableSubscribeOn.subscribeActual 方法
ObservableSubscribeOn.subscribeActual 方法
1 |
|
scheduler.scheduleDirect 方法内部其实也是通过 createWorker 去创建一个 worker 通过 schedule 方法执行线程,而线程的实现则是在 SubscribeTask 中的 run 方法上
SubscribeTask.run 方法
1 |
|
因为订阅的调用是在指定线程上执行的,所以被观察者的代码执行也是在该线程上的,订阅流程中的多次切换其实只有最靠近被观察者的 subscribeOn 会影响被观察者,并不是没有发生线程切换,只是又被切到了其他线程,其实在 onSubscribe 方法里是可以观测到线程的切换的
Scheduler 解析
线程的切换都是通过调度器创建的 worker 的 schedule 方法去实现的,而该方法是一个抽象方法,具体的逻辑由各自的子类实现,其实不难发现其中最重要的一个子类就是 NewThreadWorker,而 NewThreadWorker.schedule 最终调用的则是 NewThreadWorker.scheduleActual
NewThreadWorker.scheduleActual 方法
1 | public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { |
这个类中的线程池在构造函数中创建
1 | public NewThreadWorker(ThreadFactory threadFactory) { |
可以看出最终线程还是会交由线程池中执行,而线程池的管理则是 Scheduler 最主要的工作了,我们以常用的 io 调度器为例
io 调度器解析
在 Schedulers.io() 方法中经过多次追踪可以发现其实现为 IoScheduler 类,那么可以通过 createWorker 方法作为切入点
IoScheduler.createWorker 方法
1 |
|
我们可以看出,创建的 EventLoopWorker 只是一个壳,而真正的 worker 则是从 CachedWorkerPool 中获取的,而 CachedWorkerPool 实现是一套自己的管理 worker 实例的逻辑,是了实现 io 线程即能动态调整大小,又能实现缓存的效果
CachedWorkerPool 详解
1 | static final class CachedWorkerPool implements Runnable { |
通过这个管理池,我们可以清晰地了解 Schedulers.io() 的运行特性,它的 worker 没有上限,但是会有一个 keepAliveTime 存活时间的限制,当 worker 执行完毕后会被存入到一个缓存队列中,而管理 worker 的对象池会以一个定时任务去清理过期的 worker,未过期的 worker 有机会得到重用。这就实现了一个可以自适应大小的缓存池。
那另外一个 Schedulers.computation() 也类似,只不过它里面负责管理 worker 的 FixedSchedulerPool 已经是固定大小的了(cpu 核心数),逻辑也更简单点,有兴趣的话可以去查看一些其他的调度器实现
Android 主线程调度器
AndroidSchedulers.mainThread() 是 rxandroid 提供的主线程切换调度器,也是我们最常用的调度器之一,理解了它也有助于我们对 Android 开发的线程间通信有更好的认识
AndroidSchedulers.mainThread() 追溯进去发现具体逻辑由 HandlerScheduler 实现
1 | static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); |
它会持有一个主线程 handler 的引用
HandlerWorker.schedule 方法
1 |
|
AndroidSchedulers.mainThread() 只是通过 Android 中的 handler 特性将线程切换到了主线程执行
结语
对于 Rxjava 的线程切换也有了较为全面的认知了,接下去要分析的网络三剑客中的 retrofit, retrofit 更是一个设计模式的集大成者。