demo
这里以rxjava1.2.0为例子进行分析,首先写一个最简单的demo之后我们逐步debug走一下它的运行流程。
1 | import rx.Observable; |
这里observable有三个方法
- onNext():发送一条消息
- onError():发送一条错误信息
- onCompleted():告知observer消息发送完毕
TODO: 实际debug过程中是执行了onError之后还是走到了onComplete的,但是observer里没有相关输出,应该是哪里阻断了。
消息发送过程中onError或onComplete触发后本次消息发送流程结束,后边的消息不会再发送,包括后边所有的onNext、onError和onCompleted。
demo中如果想触发onCompleted需要注释掉onError。
流程分析
入口
observable.subscribe(observer);触发了observable发送消息,上边的代码都是在构造observer和observable。
subscribe()
observable.subscribe方法有五个重载方法,但最后都可以归结到一个方法,具体如下:
无参数
1
2
3
4
5
6public final Subscription subscribe() {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}传入onNext
1
2
3
4
5
6
7
8
9public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}传入onNext、onError
1
2
3
4
5
6
7
8
9
10
11public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}传入onNext、onError、onCompleted
1
2
3
4
5
6
7
8
9
10
11
12
13public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onCompleted == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}传入Observer
1
2
3
4
5
6
7
8
9public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
可以看到上边的五个方法是对不同的入参封装、补充了onNext、onError、onCompleted三个方法,最终都是调用下边的这个方法
1 | public final Subscription subscribe(final Observer<? super T> observer) { |
这个方法中将observer封装成了subscriber,subscriber中相对于observer补充了一些方法:
onStart(): 在事件还未发送之前被调用,可以做一些准备动作。如果准备工作的线程不是在subscribe所发生的线程,可以用doOnSebscribe()方法代替onStart()unsubscribe(): 用于取消订阅,方法调用后Subscriber不再接收事件,在这个方法调用前需要判断isUnsubscribed()。在subscribe()之后,Observable会持有Subscriber的引用,有内存泄漏的风险所以不再使用Subscriber的时候需要调用unsubscribe()来解除引用关系,避免内存泄漏
TODO: subscriber&observer
之后调用static的subscribe方法并通过this引用当前observable传给static方法
1 | public final Subscription subscribe(Subscriber<? super T> subscriber) { |
最后调用到核心的subscribe方法,如下:
1 | static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { |
这里去除掉检查异常部分主要完成了三个工作:
调取
subscriber.onStart();这个方法是subscriber比observer多的方法,在核心方法开始之前触发。这里demo中用的是observer,这个方法中并没有任何操作。subscriber包装成SafeSubscriber
1
2
3
4if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}触发核心方法
1
2RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);的前半部分RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的就是经过封装和处理的业务中的observable创建时传入的onSubscribe,后边的call方法即为调用业务逻辑中的相关call方法。
return RxJavaHooks.onObservableReturn(subscriber);返回了一个Subscribtion,用于取消订阅主题,并不在核心的调用逻辑中。
RxJavaHooks
通过上边的分析我们可以看到核心的方法是通过RxJavaHooks触发的,下边我们会分析具体的触发流程。
首先我们要明白这里传入的observable就是经过多层转化后业务代码中的observable
我们先来看下observable.onSubscribe,这里只有一个赋值的地方就是
1 | protected Observable(OnSubscribe<T> f) { |
而这个方法也只有一个调用方法
1 | public static <T> Observable<T> create(OnSubscribe<T> f) { |
这里和业务代码联系起来可以发现,onSubscribe就是业务代码中调用Observable.create()方法时传入的onSubscribe。
到这里通过属性名大概也知道为什么在subscribe时会调用call方法中的操作了,核心是调用了onSubscribe,当然是在subscribe时候调用23333。
TODO: 所以消息每次在subscribe时候推送过来,是pull的还是push的…毕竟也没看到可以往里边补充信息
我们可以看到onObservableStart()方法返回的也是一个onSubscribe,后边的call()方法调用具体的业务逻辑。
1 | public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) { |
这里Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f是一个入参Observable, Observable.OnSubscribe,出参Observable.OnSubscribe的变换方法,整个方法的作用在于,如果这里挂了onObservableStart方法,那么在执行onSubscribe的call方法之前需要先对其进行加工变换。
稍微扩展看一下这个钩子,RxJavaHooks类中静态方法调用了init方法,而init方法中定义了各个钩子
1 | static { |
这里onObservableStart钩子最终执行就是原样返回了onsubScribe给外层,所以最终在上文所述的核心代码处调用了onSubscribe.call(),从而调用了业务代码,在observable中调用了onNext、onError、onCompleted发送消息。
onNext、onError、onCompleted
在observable执行subscriber.onNext("observable send message1");发送消息的过程中可以通过debug跟进去看到他调用了SafeSubscriber的onNext()方法
1 |
|
而这里的 调用了ObserverSubscriber.OnNext()
1 |
|
从而最终调用到了业务代码中Observer的相关逻辑。
调用逻辑理清了,那么我们来反推下是什么时候把observer封装进去的。
上文我们提到了在observer的核心subscribe方法中做了三件事,其中第二件事就是将传入的observer封装成一个SafeSubscriber,也就是执行了如下方法
1 | public SafeSubscriber(Subscriber<? super T> actual) { |
而传入SafeSubscriber的observer之前已经经历过一层封装,可以查阅上文重载的subscribe方法中的第五个,传入Observer。可以看到最后封装了一层return subscribe(new ObserverSubscriber<T>(observer));
因此这里的调用逻辑从外到里就是SafeSubscriber->ObserverSubscriber->业务observer中的onNext方法。