响应式编程1基础教程(RxJava2.x示例)

标签:

本文出自jvm123.com-java技术分享站:http://jvm123.com/2019/09/observable-programming.html

响应式编程最基础的原理就是观察者模式,以下是对RxJava实现的观察者模式的介绍:

RxJava观察者模式组件:

RxJava的观察者模式有以下概念:

1 Observable<T> 观察者模式框架类

Observable<T>为一个观察者模式框架类,也称作一个可以观察的对象。可以使用ObservableEmitter<T>事件发生器来实例化,实例化之后,就是一个可以观察的对象,再调用 subscribe 方法来传入观察者,即完成一个观察者模式。只要事件发生器中有了新的事件,观察者observer就可以接收到,并执行自定义的代码。

2 ObservableEmitter<T> 事件发生器

ObservableEmitter<T> 是一个事件发生器,继承了Emitter<T>,所以可以发生以下事件:

public interface Emitter<T> {
    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);
    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);
    /**
     * Signal a completion.
     */
    void onComplete();
}

对于不同的事件(onNext、onError、onComplete),Observer<T> 观察者中有不同的观察(监听)方法。

3 Observer<T> 观察者

Observer<T> 观察者,可以理解为一个监听器,是对观察到的事件进行处理响应的对象。对应事件的发生,可以实现以下方法:

public interface Observer<T> {

    /**
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

观察者模式示例:

所以一个完整的观察者模式框架应该如下:

    // RxJava 观察者模式
    @Test
    public void test11() {
        // 创建Observable框架 (使用MyPublisher)
        Observable<Integer> intObservable = Observable.create(new MyPublisher());
        // 初始化一个 Observer (观察者)
        Observer<Integer> observer = new MyObserver();
        // 订阅到这个observer
        intObservable.subscribe(observer);
    }
/**
 * 观察者模式中被观察的对象(即事件的发布者)
 * 在RxJava 1.x 称为发布者Publisher
 * ObservableEmitter 为事件的发生器
 * @author yawn < jvm123.com >
 * 2019/9/23 10:36
 */
public class MyPublisher implements ObservableOnSubscribe<Integer> {

    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(11);
//        emitter.onNext(0);
        emitter.onNext(22);
        emitter.onComplete();
    }
}
/**
 * @author yawn < jvm123.com >
 * 2019/9/23 10:34
 */
public class MyObserver implements Observer<Integer> {

    @Override
    public void onSubscribe(Disposable disposable) {
        System.out.println("--- onSubscribe ---");
        System.out.println(disposable.toString());
//        disposable.dispose();
    }
    @Override
    public void onNext(Integer integer) {
        System.out.println("--- onNext ---");
        System.out.println("param:" + integer + " result:" + (1/integer));
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("--- onError ---");
        System.out.println(throwable.toString());
    }
    @Override
    public void onComplete() {
        System.out.println("--- onComplete ---");
    }
}

Observer观察者的说明

  1. public void onSubscribe(Disposable disposable) 方法是在Observer观察者成功订阅之后调用,其中 Disposable 可以用来取消订阅,使用其 dispose 方法即可。
  2. 如果事件发生器调用了 onError 方法,即Observer执行了onError方法之后,就不会再执行后面的onNext事件或onComplete事件。

观察者模式总结:

实现Rxjava的观察者模式可分为以下几步:

  1. 使用事件发布者实例化一个可观察对象Observable;
  2. 实例化一个观察者(监听者)Observer;
  3. 调用subscribe方法使 观察者Observer 监听 被观察对象Observable 的事件。

在实例化 可观察对象Observable 的时候, Observable 还提供了其他简单的方式以供测试,如下:

Observable.just(1, 2, 3, 5) // 依次事件发生,依次传递1、2、3、5,最多传递10个参数
Observable.concat(Observable.just(1, 2, 3), Observable.just(7, 8, 9)) // 连接两个Observable

以上示例代码综合起来链式写法如下:

    // RxJava 观察者模式
    @Test
    public void test1() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                int i = 1;
                System.out.println("i=" + i);
                e.onNext(i++);
                System.out.println("i=" + i);
                e.onNext(i++);
                System.out.println("i=" + i);
                e.onNext(i++);
                System.out.println("i=" + i);
                e.onNext(i++);
                System.out.println("i=" + i);
                e.onNext(i++);

                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            public void onSubscribe(Disposable disposable) {
                System.out.println("--- onSubscribe ---");
                System.out.println(disposable.toString());
            }
            public void onNext(Integer integer) {
                System.out.println("--- onNext ---");
                System.out.println(integer);
            }
            public void onError(Throwable throwable) {
                System.out.println("--- onError ---");
                System.out.println(throwable.toString());
            }
            public void onComplete() {
                System.out.println("--- onComplete ---");
            }
        });
    }

发表评论