RxJava操作符实践:2_变换操作之5_buffer

一、描述

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

Buffer操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。Buffer操作符在很多语言特定的实现中有很多种变体,它们在如何缓存这个问题上存在区别。

注意:如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

Window操作符与Buffer类似,但是它在发射之前把收集到的数据放进单独的Observable,而不是放进一个数据结构。

二、示意图

buffer

三、示例代码

buffer操作符有两个参数,分别为count和skip,count参数指定buffer操作符的大小,skip参数用来指定每次发射一个集合需要跳过几个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(3, 2)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext: " + integers);
}
});

四、运行结果

1
2
3
4
5
onNext: [1, 2, 3]
onNext: [3, 4, 5]
onNext: [5, 6, 7]
onNext: [7, 8]
onCompleted.

项目代码已上传到Github:https://github.com/SherlockShi/RxJavaBestPractise

五、更多

在RxJava中有许多Buffer的变体:

  • buffer(count)
  • buffer(count, skip)
  • buffer(bufferClosingSelector)
  • buffer(boundary)
  • buffer(bufferOpenings, bufferClosingSelector)
  • buffer(timespan, unit[, scheduler])
  • buffer(timespan, unit, count[, scheduler])
  • buffer(timespan, timeshift, unit[, scheduler])
  • buffer-backpressure

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

http://www.jianshu.com/p/8b9e987e6789

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!