RxJava操作符实践:6_辅助操作之4_serialize

一、描述

强制一个Observable连续调用并保证行为正确。

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。

RxJava中的实现是serialize,它默认不在任何特定的调度器上执行。

二、示意图

serialize

三、示例代码

如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。

1. 首先

创建一个不符合约定的 Observable,并且订阅到该 Observable上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("onUnsubscribe.");
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
});

2. 运行结果

1
2
3
4
onNext: 1
onNext: 2
onCompleted.
onUnsubscribe.

先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但这并不意味着总是这样的。 还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。

3. 其次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("onUnsubscribe.");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
});

4. 运行结果

1
2
3
4
5
onNext: 1
onNext: 2
onCompleted.
onNext: 3
onCompleted.

上面的示例最后就没有打印 Unsubscribed 字符串。

unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数。

5. 使用serialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.serialize()
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("onUnsubscribe.");
}
})
.unsafeSubscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
});

四、运行结果

1
2
3
onNext: 1
onNext: 2
onCompleted.

尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定,最后也收到了完成事件。

五、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

RxJava 教程第三部分:驯服数据流之自定义操作函数

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!