一、描述
强制一个Observable连续调用并保证行为正确。
一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava中的实现是serialize,它默认不在任何特定的调度器上执行。
二、示意图
三、示例代码
如果你无法确保自定义的操作符符合 Rx 的约定,例如从多个源异步获取数据,则可以使用 serialize 操作函数。 serialize 可以把一个不符合约定的 Observable 转换为一个符合约定的 Observable。
1. 首先
创建一个不符合约定的 Observable,并且订阅到该 Observable上:
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
2. 运行结果
1 | onNext: 1 |
先不管上面的 Observable 发射的数据,订阅结束的情况看起来符合 Rx 约定。 这是由于 subscribe 认为当前数据流结束的时候会主动结束这个 Subscription。但这并不意味着总是这样的。 还有一个函数为 unsafeSubscribe ,该函数不会自动取消订阅。
3. 其次
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
4. 运行结果
1 | onNext: 1 |
上面的示例最后就没有打印 Unsubscribed 字符串。
unsafeSubscribe 也不能很好的处理错误情况。所以该函数几乎没用。在文档中说:该函数应该仅仅在自定义操作函数中处理嵌套订阅的情况。 为了避免这种操作函数接受到不合法的数据流,我们可以在其上应用 serialize 操作函数。
5. 使用serialize
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
四、运行结果
1 | onNext: 1 |
尽管上面的代码中没有调用unsubscribe, 但是数据流事件依然符合约定,最后也收到了完成事件。
五、参考资料
PS:欢迎关注 SherlockShi 个人博客