除了观察者模式原理,响应式编程的另一个重要内容就是数据的处理。RxJava提供了类似java8中stream流式的数据处理方式,可以对被观察对象进行处理。
RxJava数据处理实例
@Test
public void test6() {
Observable.just(-1, -2, -2, -3, -4, 5, 5, 6, 7, 8) // 事件发生
// .observeOn(Schedulers.newThread()) // 指定事件发生的线程
// .subscribeOn(Schedulers.io()) // 指定事件回调的线程
.distinct() // 去掉重复的:[-1, -2, -3, -4, 5, 6, 7, 8]
.filter(i -> i % 2 != 0) // 去掉偶数:[-1, -3, 5, 7]
.map(Math::abs) // 取绝对值:[1, 3, 5, 7]
.buffer(2, 2)// 分成多个buffer组:
// count为每个buffer中元素的个数,
// skip为下一个分组相对偏移的元素个数
// 结果为:[1, 3], [5, 7]
// 可以调整为 buffer(2, 1) 试试效果
.subscribe(
System.out::println, // 事件成功的处理 (onNext)
e -> System.out.println(e.getMessage()) // 事件失败的处理 (onError)
);
}
数据处理详解
- distinct 去掉重复的
- filter 过滤器,接收一个Predicate类型的实例,用来判断是否过滤
- map 映射器,接受一个Function,作为映射的映射关系(转换关系)
- buffer 分组,接收两个整型参数count和skip,count表示每组的元素个数、skip表示每次分组起始元素的偏移量。
最终subscribe方法出现的数据就是经过以上几步处理过的数据。