RxJava操作符实践:6_辅助操作之10_timeout

一、描述

对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。

RxJava中的实现为timeout,但是有好几个变体。

第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出TimeoutException,以一个错误通知终止Observable。

这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。

二、示意图

timeout

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 1; i < 4; i++) {
try {
Thread.sleep(100 * i);
} catch (InterruptedException e) {
subscriber.onError(e);
}
subscriber.onNext(i);
}
subscriber.onCompleted();
}
})
.timeout(250, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
});

四、运行结果

1
2
3
onNext: 1
onNext: 2
onError: null
  • 经过100毫秒后,发射1;
  • 再经过200毫秒后,发射2;
  • 然后再经过250毫秒,还没有再发射新的数据项(这一次要300毫秒才发射),就超时了,就报出TimeoutException,以一个错误通知终止Observable。

五、更多

timeout操作符还有以下变体:

  • timeout(long,TimeUnit)
  • timeout()
  • timeout(long,TimeUnit,Observable)
  • timeout(long,TimeUnit,Observable,Scheduler)
  • timeout(Func1)
  • timeout(Func1,Observable)
  • timeout(Func0,Func1)
  • timeout(Func0,Func1,Observable)

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!