RxJava操作符实践:9_反压操作之1_onBackpressureBuffer

一、描述

onBackpressureBuffer 会缓存所有当前无法消费的数据,直到 Observer 可以处理为止。

你可以指定缓冲的数量,如果缓冲满了则会导致数据流失败。

二、示意图

onBackpressureBuffer

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer(1000)
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Long aLong) {
System.out.println("onNext: " + aLong);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

四、运行结果

1
2
3
4
5
6
7
8
9
10
11
12
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
onError: Overflowed buffer of 1000

由运行结果可以看出,生产者比消费者快 100 倍。使用 1000个缓冲来处理这种消费者比较慢的情况。当消费者消费 11个数据的时候,缓冲区满了,生产者生产了 1100个数据,数据流就抛出异常了。

五、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

RxJava 教程第四部分:并发 之数据流发射太快如何办

RxJava中backpressure这个概念的理解

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!