同上篇一样,我们这里先给出一个demo,后文基于这个demo进行分析。
demo
1 | package article; |
运行结果如下:
1 | observable 1 begin |
首先可以看到这里共用了3s左右,小于串行的5s,说明两个任务是在不同线程运行的了。
下边说明一些问题:
切换线程的两个方法
observeOn()、subscribeOn()是observable的实例方法而非类方法,这里的考虑在于方便后续多次切换线程时的链式调用,但同时也导致了一个问题就是没办法在Observable.create()之前指定scheduler,所以这里先通过just()方法创建了一个observable,又通过他的实例方法进一步做了耗时操作。just()方法本身底层就是create一个observable并发出相关消息,这个在其他的文章中再做探讨。
main方法中的
Thread.sleep(3000);主要是为了等新启动的两个线程输出结果后主线程再结束,这里如果不等的话会出现主线程已经结束了,子线程还没完成任务,这时候在控制台就看不到完整的输出。这里main方法中的
Thread.sleep(3000);可以换一种写法1
2
3
4
5
6
7Long start = System.currentTimeMillis();
Observable.merge(observable1, observable2).toBlocking().subscribe(onNext -> System.out.format("\nobserver receive onNext:\n\tmessage: %s\n", onNext),
onError -> {
System.out.format("\nobserver receive onError:\n\tmessage: %s\n\tstackTrace: \n", onError.getMessage());
onError.printStackTrace();
}, () -> System.out.format("\nobserver receive onCompleted\n"));
Long end = System.currentTimeMillis();首先调用
merge()方法将两个Observable合并成一个,这个过程中并不影响他们observeOn()方法在新启动线程上完成的操作。merge()方法本身并不在这里讨论,会再其他文章中再做说明。toBlocking()将一个普通的Observable转化成BlockingObservable,内部使用CountDownLatch实现阻塞操作。
observeOn切换线程
这里observeOn(Schedulers.newThread())方法分两部分
Schedulers.newThread()
1 | public static Scheduler newThread() { |
这里进入Schedulers类中可以看到
1 | public final class Schedulers { |
这里分三种线程调度器,计算型、io型和启动新线程类型,这里对scheduler的细节暂时不做讨论,这里引用的是newThreadScheduler
observeOn
点击进入observeOn()方法可以看到该方法也有四个重载方法,这里调用的是
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
其他两个分别是带有delayError、bufferSize参数的,最终都调用了
1 | public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { |
当涉及到delayError时会调用lift()方法进行变换操作,当前是一个单纯发送消息的Observer,走了instanceof ScalarSynchronousObservable分支。
TODO: 后续看多种subject过程中应该会涉及到这里的lift()方法,后续补充。
当前分支调用方法核心如下
1 | public Observable<T> scalarScheduleOn(final Scheduler scheduler) { |
这里最后调用的create()方法就是基本调用流程中介绍业务代码中的Observable.create()方法,使用最终包装好的ScalarAsyncOnSubscribe创建Observable,回归到基本流程上。
而其中的包装过程分为两部分:
创建
onSchedule,上述代码的call()方法(下称做封装call方法)中可以看到其将业务中的onSubscribe.call()方法(下称作业务call方法)封装在了onSchedule.call中,而封装的call方法通过scheduler.createWorker()这个线程中调用。而具体执行线程根据策略判断,这里我们scheduler选用的newThread策略,所以call方法会被封装到一个新线程中去调用。new ScalarAsyncOnSubscribe<T>(t, onSchedule)将onSchedule再次封装成一个onObservable
因此这里再调用通过如下逻辑onSchedule->根据scheduler.createWorker()策略选择调用线程->调用worker中的call()方法->调用onObservable.call()中的业务代码->触发observer.call()。
TODO: 这里调用过程中走的肯定和单纯的observable不一样,后续还需要细化
subscribeOn
subscribeOn()方法和observeOn()方法类似,都用于控制线程,不同的是observeOn()控制的是observable的工作线程,而subscribeOn()是控制的observer的工作线程。
同时,两个方法控制的范围不同observeOn()控制调用链接在该方法后的方法,而subscribeOn()控制其之前的一段调用方法。
observeOn:
subscribeOn:
根据上边的特性,observeOn()两个连续调用以后边的为准,subscribeOn()两个连续调用以前边的为准。
