登录 立即注册
安币:

安卓巴士 - 安卓开发 - Android开发 - 安卓 - 移动互联网门户

查看: 374|回复: 0

Android消息总线的演进之路:用LiveDataBus替代RxBus、Eve

[复制链接]

327

主题

328

帖子

4014

安币

手工艺人

发表于 2019-1-31 10:00:01 | 显示全部楼层 |阅读模式
如果对本篇文章感兴趣,请前往,原文地址:http://www.apkbus.com/blog-865069-79469.html

**正文:**对于```Android```系统来说,消息传递是最基本的组件,每一个```App```内的不同页面,不同组件都在进行消息传递。消息传递既可以用于```Android```四大组件之间的通信,也可用于异步线程和主线程之间的通信。对于```Android```开发者来说,经常使用的消息传递方式有很多种,从最早使用的```Handler```、```BroadcastReceiver```、接口回调,到近几年流行的通信总线类框架```EventBus```、```RxBus```。```Android```消息传递框架,总在不断的演进之中。### 从 EventBus 说起```EventBus```是一个```Android```事件发布/订阅框架,通过解耦发布者和订阅者简化```Android```事件传递。```EventBus```可以代替```Android```传统的```Intent```、```Handler```、```Broadcast```或接口回调,在```Fragment```、```Activity```、```Service```线程之间传递数据,执行方法。```EventBus```最大的特点就是简洁、解耦。在没有```EventBus```之前我们通常用广播来实现监听,或者自定义接口函数回调,有的场景我们也可以直接用```Intent```携带简单数据,或者在线程之间通过```Handler```处理消息传递。但无论是广播还是```Handler```机制远远不能满足我们高效的开发。```EventBus```简化了应用程序内各组件间、组件与后台线程间的通信。```EventBus```一经推出,便受到广大开发者的推崇。现在看来,```EventBus```给```Android```开发者世界带来了一种新的框架和思想,就是消息的发布和订阅。这种思想在其后很多框架中都得到了应用。![Picture](//upload-images.jianshu.io/upload_images/2139461-eed2a59eee37e8ce.png)#### 发布/订阅模式订阅发布模式定义了一种“一对多”的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。![Picture](//upload-images.jianshu.io/upload_images/2139461-a25c03c7f9b647c8.png)### RxBus 的出现```RxBus```不是一个库,而是一个文件,实现只有短短30行代码。```RxBus```本身不需要过多分析,它的强大完全来自于它基于的```RxJava```技术。响应式编程(```Reactive Programming```)技术这几年特别火,```RxJava```是它在```Java上```的实作。```RxJava```天生就是发布/订阅模式,而且很容易处理线程切换。所以,```RxBus```凭借区区30行代码,就敢挑战```EventBus```“江湖老大”的地位。#### RxBus 原理在```RxJava```中有个```Subject```类,它继承```Observable```类,同时实现了```Observer```接口,因此```Subject```可以同时担当订阅者和被订阅者的角色,我们使用```Subject```的子类```PublishSubject```来创建一个```Subject```对象(```PublishSubject```只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该```Subject```对象,之后如果```Subject```对象接收到事件,则会发射给该订阅者,此时```Subject```对象充当被订阅者的角色。完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的```Subject```对象,则此时```Subject```对象作为订阅者接收事件,然后会立刻将事件转发给订阅该```Subject```对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。最后就是取消订阅的操作了,```RxJava```中,订阅操作会返回一个```Subscription```对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个```Subscription```对象,我们可以用一个```CompositeSubscription```存储起来,以进行批量的取消订阅。**RxBus 有很多实现,如:**[AndroidKnife/RxBus]([https://github.com/AndroidKnife/RxBus](https://github.com/AndroidKnife/RxBus)[Blankj/RxBus]([https://github.com/Blankj/RxBus](https://github.com/Blankj/RxBus)其实正如前面所说的,```RxBus```的原理是如此简单,我们自己都可以写出一个```RxBus```的实现:**基于 RxJava1 的 RxBus 实现:**```public final class RxBus {    private final Subject bus;    private RxBus() {        bus = new SerializedSubject(PublishSubject.create());    }    private static class SingletonHolder {        private static final RxBus defaultRxBus = new RxBus();    }    public static RxBus getInstance() {        return SingletonHolder.defaultRxBus;    }    /*     * 发送     */    public void post(Object o) {        bus.onNext(o);    }    /*     * 是否有Observable订阅     */    public boolean hasObservable() {        return bus.hasObservers();    }    /*     * 转换为特定类型的Obserbale     */    public  Observable toObservable(Class type) {        return bus.ofType(type);    }}```**基于 RxJava2 的 RxBus 实现:**```public final class RxBus2 {    private final Subject bus;    private RxBus2() {        // toSerialized method made bus thread safe        bus = PublishSubject.create().toSerialized();    }    public static RxBus2 getInstance() {        return Holder.BUS;    }    private static class Holder {        private static final RxBus2 BUS = new RxBus2();    }    public void post(Object obj) {        bus.onNext(obj);    }    public  Observable toObservable(Class tClass) {        return bus.ofType(tClass);    }    public Observable toObservable() {        return bus;    }    public boolean hasObservers() {        return bus.hasObservers();    }}```### 引入 LiveDataBus 的想法#### 从 LiveData 谈起```LiveData```是```Android Architecture Components```提出的框架。```LiveData```是一个可以被观察的数据持有类,它可以感知并遵循```Activity```、```Fragment```或```Service```等组件的生命周期。正是由于```LiveData```对组件生命周期可感知特点,因此可以做到仅在组件处于生命周期的激活状态时才更新```UI```数据。```LiveData```需要一个观察者对象,一般是```Observer```类的具体实现。当观察者的生命周期处于```STARTED```或```RESUMED```状态时,```LiveData```会通知观察者数据变化;在观察者处于其他状态时,即使```LiveData```的数据变化了,也不会通知。**LiveData 的优点**- **UI 和实时数据保持一致**,因为```LiveData```采用的是观察者模式,这样一来就可以在数据发生改变时获得通知,更新```UI```。- **避免内存泄漏**,观察者被绑定到组件的生命周期上,当被绑定的组件销毁(```destroy```)时,观察者会立刻自动清理自身的数据。- **不会再产生由于 Activity 处于 stop 状态而引起的崩溃**,例如:当```Activity```处于后台状态时,是不会收到```LiveData```任何事件的。- **不需要再解决生命周期带来的问题**,```LiveData```可以感知被绑定的组件的生命周期,只有在活跃状态才会通知数据变化。- **实时数据刷新**,当组件处于活跃状态或者从不活跃状态到活跃状态时总是能收到最新的数据。- **解决```Configuration Change```问题**,在屏幕发生旋转或者被回收再次启动,立刻就能收到最新的数据。**谈一谈 Android Architecture Components**```Android Architecture Components```的核心是```Lifecycle```、```LiveData```、```ViewModel```以及 ```Room```,通过它可以非常优雅的让数据与界面进行交互,并做一些持久化的操作,高度解耦,自动管理生命周期,而且不用担心内存泄漏的问题。- **Room**一个强大的```SQLite```对象映射库。- **ViewModel**一类对象,它用于为```UI```组件提供数据,在设备配置发生变更时依旧可以存活。- **LiveData**一个可感知生命周期、可被观察的数据容器,它可以存储数据,还会在数据发生改变时进行提醒。- **Lifecycle**包含```LifeCycleOwer```和```LifecycleObserver```,分别是生命周期所有者和生命周期感知者。**Android Architecture Components 的特点**- **数据驱动型编程**变化的永远是数据,界面无需更改。- **感知生命周期**,防止内存泄漏- **高度解耦**数据,界面高度分离。- **数据持久化**数据、```ViewModel```不与 ```UI```的生命周期挂钩,不会因为界面的重建而销毁。#### 重点:为什么使用 LiveData 构建数据通信总线 LiveDataBus**使用 LiveData 的理由**- **LiveData 具有的这种可观察性和生命周期感知的能力,使其非常适合作为 Android 通信总线的基础构件**。- **使用者不用显示调用反注册方法**。由于```LiveData```具有生命周期感知能力,所以```LiveDataBus```只需要调用注册回调方法,而不需要显示的调用反注册方法。这样带来的好处不仅可以编写更少的代码,而且可以完全杜绝其他通信总线类框架(如```EventBus```、```RxBus```)忘记调用反注册所带来的内存泄漏的风险。#### 为什么要用LiveDataBus替代EventBus和RxBus- **LiveDataBus 的实现极其简单**,相对```EventBus```复杂的实现,```LiveDataBus```只需要一个类就可以实现。- **LiveDataBus 可以减小 APK 包的大小**,由于```LiveDataBus```只依赖```Android```官方```Android Architecture Components```组件的```LiveData```,没有其他依赖,本身实现只有一个类。作为比较,```EventBus JAR```包大小为```57kb```,```RxBus```依赖```RxJava```和```RxAndroid```,其中```RxJava2```包大小```2.2MB```,```RxJava1```包大小```1.1MB```,```RxAndroid```包大小```9kb```。使用```LiveDataBus```可以大大减小```APK```包的大小。- **LiveDataBus 依赖方支持更好**,```LiveDataBus```只依赖```Android```官方```Android Architecture Components```组件的```LiveData```,相比```RxBus```依赖的```RxJava```和```RxAndroid```,依赖方支持更好。- **LiveDataBus 具有生命周期感知**,```LiveDataBus```具有生命周期感知,在```Android```系统中使用调用者不需要调用反注册,相比```EventBus```和```RxBus```使用更为方便,并且没有内存泄漏风险。#### LiveDataBus 的设计和架构**LiveDataBus 的组成**- **消息**消息可以是任何的```Object```,可以定义不同类型的消息,如```Boolean```、```String```。也可以定义自定义类型的消息。- **消息通道**```LiveData```扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是```String```类型的,可以通过名字获取到一个```LiveData```消息通道。- **消息总线**消息总线通过单例实现,不同的消息通道存放在一个```HashMap```中。- **订阅**订阅者通过```getChannel```获取消息通道,然后调用```observe```订阅这个通道的消息。发布发布者通过```getChannel```获取消息通道,然后调用```setValue```或者```postValue```发布消息。**LiveDataBus原理图**![Picture](//upload-images.jianshu.io/upload_images/2139461-eccbb9c98513708b.png)#### LiveDataBus的实现第一个实现:```public final class LiveDataBus {    private final Map bus;    private LiveDataBus() {        bus = new HashMap();    }    private static class SingletonHolder {        private static final LiveDataBus DATA_BUS = new LiveDataBus();    }    public static LiveDataBus get() {        return SingletonHolder.DATA_BUS;    }    public  MutableLiveData getChannel(String target, Class type) {        if (!bus.containsKey(target)) {            bus.put(target, new MutableLiveData());        }        return (MutableLiveData) bus.get(target);    }    public MutableLiveData getChannel(String target) {        return getChannel(target, Object.class);    }}```短短二十行代码,就实现了一个通信总线的全部功能,并且还具有生命周期感知功能,并且使用起来也及其简单:**注册订阅:**```LiveDataBus.get().getChannel("key_test", Boolean.class)        .observe(this, new Observer() {            @Override            public void onChanged(@Nullable Boolean aBoolean) {            }        });```**发送消息:**```LiveDataBus.get().getChannel("key_test").setValue(true);```我们发送了一个名为"```key_test```",值为```true```的事件。这个时候订阅者就会收到消息,并作相应的处理,非常简单。**问题出现**对于```LiveDataBus```的第一版实现,我们发现,在使用这个```LiveDataBus```的过程中,订阅者会收到订阅之前发布的消息。对于一个消息总线来说,这是不可接受的。无论```EventBus```或者```RxBus```,订阅方都不会收到订阅之前发出的消息。对于一个消息总线,```LiveDataBus```必须要解决这个问题。**问题分析**怎么解决这个问题呢?先分析下原因:当```LifeCircleOwner```的状态发生变化的时候,会调用```LiveData.ObserverWrapper```的```activeStateChanged```函数,如果这个时候```ObserverWrapper```的状态是```active```,就会调用```LiveData```的```dispatchingValue```。![Picture](//upload-images.jianshu.io/upload_images/2139461-52c3bd2cf72e4f93.png)在```LiveData```的```dispatchingValue```中,又会调用```LiveData```的```considerNotify```方法。![Picture](//upload-images.jianshu.io/upload_images/2139461-79caf1935cba6c8b.png)在```LiveData```的```considerNotify```方法中,红框中的逻辑是关键,如果```ObserverWrapper```的```mLastVersion```小于```LiveData```的```mVersion```,就会去回调mObserver的onChanged方法。而每个新的订阅者,其```version```都是```-1```,```LiveData```一旦设置过其```version```是大于```-1```的(每次```LiveData```设置值都会使其```version```加```1```),这样就会导致```LiveDataBus```每注册一个新的订阅者,这个订阅者立刻会收到一个回调,即使这个设置的动作发生在订阅之前。![Picture](//upload-images.jianshu.io/upload_images/2139461-fc5fe987c06832c6.png)**问题原因总结**对于这个问题,总结一下发生的核心原因。对于```LiveData```,其初始的```version```是```-1```,当我们调用了其```setValue```或者```postValue```,其```vesion```会``` 1```;对于每一个观察者的封装```ObserverWrapper```,其初始```version```也为```-1```,也就是说,每一个新注册的观察者,其```version```为```-1```;当```LiveData```设置这个```ObserverWrapper```的时候,如果```LiveData```的```version```大于```ObserverWrapper```的```version```,```LiveData```就会强制把当前```value```推送给```Observer```。**如何解决这个问题**明白了问题产生的原因之后,我们来看看怎么才能解决这个问题。很显然,根据之前的分析,只需要在注册一个新的订阅者的时候把```Wrapper```的```version```设置成跟```LiveData```的```version```一致即可。那么怎么实现呢,看看```LiveData```的```observe```方法,他会在 ```步骤1```创建一个```LifecycleBoundObserver```,```LifecycleBoundObserver```是```ObserverWrapper```的派生类。然后会在```步骤2```把这个```LifecycleBoundObserver```放入一个私有```Map```容器```mObservers```中。无论```ObserverWrapper```还是```LifecycleBoundObserver```都是私有的或者包可见的,所以无法通过继承的方式更改```LifecycleBoundObserver```的```version```。那么能不能从```Map```容器```mObservers```中取到```LifecycleBoundObserver```,然后再更改```version```呢?答案是肯定的,通过查看```SafeIterableMap```的源码我们发现有一个```protected```的```get```方法。因此,在调用```observe```的时候,我们可以通过反射拿到```LifecycleBoundObserver```,再把```LifecycleBoundObserver```的```version```设置成和```LiveData```一致即可。![Picture](//upload-images.jianshu.io/upload_images/2139461-8792edda8c9962a6.png)对于非生命周期感知的```observeForever```方法来说,实现的思路是一致的,但是具体的实现略有不同。```observeForever```的时候,生成的```wrapper```不是```LifecycleBoundObserver```,而是```AlwaysActiveObserver```(```步骤1```),而且我们也没有机会在```observeForever```调用完成之后再去更改```AlwaysActiveObserver```的```version```,因为在```observeForever```方法体内,```步骤3```的语句,回调就发生了。![Picture](//upload-images.jianshu.io/upload_images/2139461-76011e98b6a41906.png)那么对于```observeForever```,如何解决这个问题呢?既然是在调用内回调的,那么我们可以写一个```ObserverWrapper```,把真正的回调给包装起来。把```ObserverWrapper```传给```observeForever```,那么在回调的时候我们去检查调用栈,如果回调是```observeForever```方法引起的,那么就不回调真正的订阅者。#### LiveDataBus最终实现```public final class LiveDataBus {    private final Map bus;    private LiveDataBus() {        bus = new HashMap();    }    private static class SingletonHolder {        private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();    }    public static LiveDataBus get() {        return SingletonHolder.DEFAULT_BUS;    }    public  MutableLiveData with(String key, Class type) {        if (!bus.containsKey(key)) {            bus.put(key, new BusMutableLiveData());        }        return (MutableLiveData) bus.get(key);    }    public MutableLiveData with(String key) {        return with(key, Object.class);    }    private static class ObserverWrapper implements Observer {        private Observer observer;        public ObserverWrapper(Observer observer) {            this.observer = observer;        }        @Override        public void onChanged(@Nullable T t) {            if (observer != null) {                if (isCallOnObserve()) {                    return;                }                observer.onChanged(t);            }        }        private boolean isCallOnObserve() {            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();            if (stackTrace != null && stackTrace.length > 0) {                for (StackTraceElement element : stackTrace) {                    if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&                            "observeForever".equals(element.getMethodName())) {                        return true;                    }                }            }            return false;        }    }    private static class BusMutableLiveData extends MutableLiveData {        private Map observerMap = new HashMap();        @Override        public void observe(@NonNull LifecycleOwner owner, @NonNull Observer observer) {            super.observe(owner, observer);            try {                hook(observer);            } catch (Exception e) {                e.printStackTrace();            }        }        @Override        public void observeForever(@NonNull Observer observer) {            if (!observerMap.containsKey(observer)) {                observerMap.put(observer, new ObserverWrapper(observer));            }            super.observeForever(observerMap.get(observer));        }        @Override        public void removeObserver(@NonNull Observer observer) {            Observer realObserver = null;            if (observerMap.containsKey(observer)) {                realObserver = observerMap.remove(observer);            } else {                realObserver = observer;            }            super.removeObserver(realObserver);        }        private void hook(@NonNull Observer observer) throws Exception {            //get wrapper's version            Class classLiveData = LiveData.class;            Field fieldObservers = classLiveData.getDeclaredField("mObservers");            fieldObservers.setAccessible(true);            Object objectObservers = fieldObservers.get(this);            Class classObservers = objectObservers.getClass();            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);            methodGet.setAccessible(true);            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);            Object objectWrapper = null;            if (objectWrapperEntry instanceof Map.Entry) {                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();            }            if (objectWrapper == null) {                throw new NullPointerException("Wrapper can not be bull!");            }            Class classObserverWrapper = objectWrapper.getClass().getSuperclass();            Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");            fieldLastVersion.setAccessible(true);            //get livedata's version            Field fieldVersion = classLiveData.getDeclaredField("mVersion");            fieldVersion.setAccessible(true);            Object objectVersion = fieldVersion.get(this);            //set wrapper's version            fieldLastVersion.set(objectWrapper, objectVersion);        }    }}```**注册订阅:**```LiveDataBus.get()        .with("key_test", String.class)        .observe(this, new Observer() {            @Override            public void onChanged(@Nullable String s) {            }        });```**发送消息:**```LiveDataBus.get().with("key_test").setValue(s);```**源码说明**```LiveDataBus```的源码可以直接拷贝使用,也可以前往作者的```GitHub```仓库查看下载:[https://github.com/JeremyLiao/LiveDataBus](https://github.com/JeremyLiao/LiveDataBus)### 总结本文提供了一个新的消息总线框架 —— **LiveDataBus**。订阅者可以订阅某个消息通道的消息,发布者可以把消息发布到消息通道上。利用```LiveDataBus```,不仅可以实现消息总线功能,而且对于订阅者,他们不需要关心何时取消订阅,极大减少了因为忘记取消订阅造成的内存泄漏风险。  继续阅读全文



想在安卓巴士找到更多优质博文,可移步博客区

如果对本篇文章感兴趣,请前往,
原文地址:
http://www.apkbus.com/blog-865069-79469.html
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

站长推荐

通过邮件订阅最新安卓weekly信息
上一条 /4 下一条

下载安卓巴士客户端

全国最大的安卓开发者社区

广告投放| 广东互联网违法和不良信息举报中心|中国互联网举报中心|下载客户端|申请友链|手机版|站点统计|安卓巴士 ( 粤ICP备15117877号 )

快速回复 返回顶部 返回列表