<
RXJava基础总结
>
上一篇

Socket传输音视频流
下一篇

OpenGL.ES学习总结

记录初学RXJava时的总结(其实后面会发现这些只是API的使用,还是要理解原理以及为什么!

RXJava

RXJava的异步实现————观察者模式 Observable (可观察者,即被观察者)、 Observer / Subscriber(观察者) 、 subscribe (订阅)、事件 Observable ————>onEvent(params)————>Observer Observable发出事件通知Observer

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

观察者模式:

A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化一瞬间做出反应(警察抓小偷) 但观察者不需要时刻盯着被观察者(A不需要每过2ms就检查一次B的状态)而是采用注册或订阅的方式,告诉被观察者:我需要你的某某状态,你在变化时通知我!!! 典型例子就是Android中的OnClickListener: 对设置OnClickListener来说,View是被观察者,OnClickListener是观察者,二者通过setOnClickListener()达成订阅关系。 既省去了反复检索状态的资源消耗,也提高了反馈速度 当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件)

RXJava的观察者模式:

RXJava有四个基本概念:Observable(被观察者),Observer(观察者),subscribe(订阅),事件 Observable和Observer通过subscribe()方法实现订阅关系;从而Observable可在需要时发出事件来通知Observer。 与传统观察者模式不同,RXJava的事件回调除了:

  1. 普通事件onNext(相当于onClick()/onEvent),
  2. 还定义了两个特殊的事件:onCompleted()和onError()

  3. onCompleted():事件队列完结。RXJava不但把每个事件单独处理,还会把它们看做一个队列。当不会再有新的onNext()发出时,需要触发onComplete()作为标志
  4. onError():事件队列异常。事件处理过程中出异常,onError()会被触发,同时队列自动终止,不允许再有事件发出 在一个正确运行的事件序列中,onComplete()和onError()有且只有一个,并且是最后一个。二者也是互斥的。有且只有一个!!!

基本实现:

  1. 创建Observer Observer:即观察者,它决定事件触发的时候将有怎样的行为。 Subscriber:是对Observer接口进行了一些扩展,但它们使用方式完全一样!!! 实质上,在RXJava的subscribe过程中,Observer也会先被转成一个Subscriber!!! 区别:
    1. onStart():这是Subscriber增加的方法。会在subscribe刚开始,而事件还未发送之前被调用,可以用于做准备工作,如数据清零重置。是一个可选方法,默认下它的实现为空。不能指定线程(弹出对话框比在主线程)————doOnSubscribe()
    2. unsubscribe():这是Subscriber所实现的另一个接口Subscription的方法,用于取消订阅。调用后,Subscriber不再接受事件。调用前使用isUnsubscribed()判断状态。这个方法很重要,因为subscribe()之后,Observer会持有Subscriber引用,不及时释放将有内存泄漏的风险。所以在不再使用时尽快在合适地方onPause,onStop,调用unsubscribe()解除引用关系!!!
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
  1. 创建Observable Observable:即被观察者,它决定什么时候触发事件以及触发怎样的事件 RXJava使用create()方法来创建一个Observable,并为它定义事件触发规则
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

传入一个OnSubscribe对象作为参数。OnSubscribe会被存储在返回的Observable对象中,它的作用相当于一个计划表,当Observable被订阅时,OnSubscribe的call()方法会自动被调用,事件序列就会按照设定依次触发(对于上面的例子就是触发三次onNext()和一次OnComplete) 这样由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式!!!

create()方法是RXJava最基本的创造事件序列的方法。基于这个方法,RXJava还提供了一些方法用来快捷创建事件队列:

  1. just(T…):将传入的参数依次发出
  2. from(T[]) / from(Iterable<? extends T>):将传入的数组或Iterable拆分成具体对象后,依次发出
Observable observable = Observable.just("Hello", "Hi", "Aloha");

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);

// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

—just(T…)和from(T[]),都和之前create(OnSubscribe)等价

  1. Subscribe(订阅) 创建Observable和Observer之后,再用subscribe()将它们链接起来,整条链就可以工作了 observable.subscribe(observer); // 或者: observable.subscribe(subscriber);
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

除了subscribe(Observer) 和 subscribe(Subscriber),subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber

  1. 在RXJava的默认规则中,事件的发出和消费都是在同一个线程的。也就是说实际只是一个同步的观察者模式。 但是观察者模式本身的目的就是“后台处理,前台回调”的异步机制!因此异步很重要!而要实现异步就需要用到RXJava的另一个概念:Schedule!!!

线程控制Schedule:

在不指定线程的情况下,RXJava遵循的是线程不变原则:在哪个线程调用subscribe(),就在哪个线程生产事件,在哪个线程生产事件,就在哪个线程消费事件!!! 如果需要切换线程就需要用到Schedule(调度器)!!!

  1. Schedule的API 在RXJava中,Schedule相当于线程控制器,RXJava通过它来指定每一段代码应该运行在什么样的线程。RXJava内置了几个Schedule,它们已经适合大多数场合。
    1. Schedulers.immediate():直接在当前线程运行,相当于不指定线程。这是默认的Schedule。
    2. Schedulers.newThread():总是启用新线程,并在新线程执行操作
    3. Schedulers.io():I/O操作所使用的Schedule。行为模式和newThread()差不多,区别在于io()内部实现是用一个无数量上限的线程池,可以重用空闲线程,因此多数情况io()比newThread()更有效率。不要把计算工作放io()中,可以避免创建不必要的线程。
    4. Schedulers.computation():计算所使用的Schedulers。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,图形的计算。I/O的等待操作会浪费CPU时间。
    5. AndroidSchedulers.mainThread():Android主线程运行 有了这几个Scheduler,就可以使用subscribeOn()和observeOn()两个方法来对线程进行控制了。
    6. subscribeOn():指定subscribe()所发生的线程,即Observable.Onsubscribe被激活时所处的线程,也叫事件产生的线程
    7. observeOn():指定Subscriber所运行的线程,也叫事件消费的线程

变换

将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列 map():事件对象的直接变换,把传入的参数转化之后返回另一个对象,一对一的转化 flatMap():都是把传入参数转化成另一个对象,不同是flatMap()返回是个Observable对象,并且这个Observable并不是直接发到了Subscriber中。一对多的转化 flatMap原理:

  1. 使用传入的事件对象创建Observable对象
  2. 并不发送这个Observable,而是将它激活,于是它开始发送事件
  3. 每一个创建出来的Observable发送的事件,都会被汇入同一个Observable,而这个Observable负责将这些事件统一交给Subscriber的回调 三个步骤把事件拆分成了两级!!! 通过一组新创建的Observable将初始的对象【铺平】之后通过统一路径分发下去
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

变换原理:实质上都是针对事件序列的处理和再发送

compose:对Observable整体的变换

Schedule2

多次切换线程。observeOn()指的是Subscriber的线程,而这个Subscriber并不是subscribe()参数参数中的Subscriber,而是observeOn()执行时的当前Observable所对应的Subscriber,即它的下级Subscriber。换句话说,observeOn()指定的是它之后的操作所在的线程 不同于observeOn(),subscribeOn()的位置放哪都行,但是只能调用一次!!!使用多个时,只有第一个起作用!!!

doOnSubscribe(): 等同onStart(),但是可以指定线程

Top
Foot