RxJava+Retrofit+OkHttp实现多文件下载之断点续传,rxjava retrofit2 mvp

9
回复
1824
查看
[复制链接]

426

主题

1111

帖子

1933

安币

手工艺人

发表于 2018-3-14 15:27:09 | 显示全部楼层 |阅读模式

            

        背景

        断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的rxjava和retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

        效果

        

2017111710123711.jpg

        实现

        下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

        1.创建service接口

        和以前一样,先写接口

        注意:streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)

[Java] 查看源文件 复制代码
 /*断点续传下载接口*/
  @streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/
  @get
  observable<responsebody> download(@header("range") string start, @url string url);

        2.复写responsebody

        和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖responsebody类,写入进度监听回调

[Java] 查看源文件 复制代码
/**
 * 自定义进度的body
 * @author wzg
 */
public class downloadresponsebody extends responsebody {
  private responsebody responsebody;
  private downloadprogresslistener progresslistener;
  private bufferedsource bufferedsource;

  public downloadresponsebody(responsebody responsebody, downloadprogresslistener progresslistener) {
    this.responsebody = responsebody;
    this.progresslistener = progresslistener;
  }

  @override
  public bufferedsource source() {
    if (bufferedsource == null) {
      bufferedsource = okio.buffer(source(responsebody.source()));
    }
    return bufferedsource;
  }

  private source source(source source) {
    return new forwardingsource(source) {
      long totalbytesread = 0l;
      @override
      public long read(buffer sink, long bytecount) throws ioexception {
        long bytesread = super.read(sink, bytecount);
        // read() returns the number of bytes read, or -1 if this source is exhausted.
        totalbytesread += bytesread != -1 

        3.自定义进度回调接口

[Java] 查看源文件 复制代码
/**
 * 成功回调处理
 * created by wzg on 2016/10/20.
 */
public interface downloadprogresslistener {
  /**
   * 下载进度
   * @param read
   * @param count
   * @param done
   */
  void update(long read, long count, boolean done);
}

        4.复写interceptor

        复写interceptor,可以将我们的监听回调通过okhttp的client方法addinterceptor自动加载我们的监听回调和responsebody

[Java] 查看源文件 复制代码
/**
 * 成功回调处理
 * created by wzg on 2016/10/20.
 */
public class downloadinterceptor implements interceptor {

  private downloadprogresslistener listener;

  public downloadinterceptor(downloadprogresslistener listener) {
    this.listener = listener;
  }

  @override
  public response intercept(chain chain) throws ioexception {
    response originalresponse = chain.proceed(chain.request());

    return originalresponse.newbuilder()
        .body(new downloadresponsebody(originalresponse.body(), listener))
        .build();
  }
}

        5.封装请求downinfo数据

        这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greendao框架存储数据

[Java] 查看源文件 复制代码
public class downinfo {
  /*存储位置*/
  private string savepath;
  /*下载url*/
  private string url;
  /*基础url*/
  private string baseurl;
  /*文件总长度*/
  private long countlength;
  /*下载长度*/
  private long readlength;
  /*下载唯一的httpservice*/
  private httpservice service;
  /*回调监听*/
  private httpprogressonnextlistener listener;
  /*超时设置*/
  private int default_timeout = 6;
  /*下载状态*/
  private downstate state;
  }

        6.downstate状态封装

        很简单,和大多数封装框架一样

[Java] 查看源文件 复制代码
public enum downstate {
  start,
  down,
  pause,
  stop,
  error,
  finish,
}

        7.请求httpprogressonnextlistener回调封装类

        注意:这里和downloadprogresslistener不同,这里是下载这个过程中的监听回调,downloadprogresslistener只是进度的监听

        通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活

[Java] 查看源文件 复制代码
/**
 * 下载过程中的回调处理
 * created by wzg on 2016/10/20.
 */
public abstract class httpprogressonnextlistener<t> {
  /**
   * 成功后回调方法
   * @param t
   */
  public abstract void onnext(t t);

  /**
   * 开始下载
   */
  public abstract void onstart();

  /**
   * 完成下载
   */
  public abstract void oncomplete();


  /**
   * 下载进度
   * @param readlength
   * @param countlength
   */
  public abstract void updateprogress(long readlength, long countlength);

  /**
   * 失败或者错误方法
   * 主动调用,更加灵活
   * @param e
   */
   public void onerror(throwable e){

   }

  /**
   * 暂停下载
   */
  public void onpuase(){

  }

  /**
   * 停止下载销毁
   */
  public void onstop(){

  }
}

        8.封装回调subscriber

        准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中



  

    1. sub需要继承downloadprogresslistener,和自带的回调一起组成我们需要的回调结果


  

    2. 传入downinfo数据,通过回调设置downinfo的不同状态,保存状态


  

    3. 通过rxandroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)


  

    4. update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)

[Java] 查看源文件 复制代码
/**
 * 用于在http请求开始时,自动显示一个progressdialog
 * 在http请求结束是,关闭progressdialog
 * 调用者自己对请求数据进行处理
 * created by wzg on 2016/7/16.
 */
public class progressdownsubscriber<t> extends subscriber<t> implements downloadprogresslistener {
  //弱引用结果回调
  private weakreference<httpprogressonnextlistener> msubscriberonnextlistener;
  /*下载数据*/
  private downinfo downinfo;


  public progressdownsubscriber(downinfo downinfo) {
    this.msubscriberonnextlistener = new weakreference<>(downinfo.getlistener());
    this.downinfo=downinfo;
  }

  /**
   * 订阅开始时调用
   * 显示progressdialog
   */
  @override
  public void onstart() {
    if(msubscriberonnextlistener.get()!=null){
      msubscriberonnextlistener.get().onstart();
    }
    downinfo.setstate(downstate.start);
  }

  /**
   * 完成,隐藏progressdialog
   */
  @override
  public void oncompleted() {
    if(msubscriberonnextlistener.get()!=null){
      msubscriberonnextlistener.get().oncomplete();
    }
    downinfo.setstate(downstate.finish);
  }

  /**
   * 对错误进行统一处理
   * 隐藏progressdialog
   *
   * @param e
   */
  @override
  public void onerror(throwable e) {
    /*停止下载*/
    httpdownmanager.getinstance().stopdown(downinfo);
    if(msubscriberonnextlistener.get()!=null){
      msubscriberonnextlistener.get().onerror(e);
    }
    downinfo.setstate(downstate.error);
  }

  /**
   * 将onnext方法中的返回结果交给activity或fragment自己处理
   *
   * @param t 创建subscriber时的泛型类型
   */
  @override
  public void onnext(t t) {
    if (msubscriberonnextlistener.get() != null) {
      msubscriberonnextlistener.get().onnext(t);
    }
  }

  @override
  public void update(long read, long count, boolean done) {
    if(downinfo.getcountlength()>count){
      read=downinfo.getcountlength()-count+read;
    }else{
      downinfo.setcountlength(count);
    }
    downinfo.setreadlength(read);
    if (msubscriberonnextlistener.get() != null) {
      /*接受进度消息,造成ui阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/
      rx.observable.just(read).observeon(androidschedulers.mainthread())
          .subscribe(new action1<long>() {
        @override
        public void call(long along) {
           /*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/
          if(downinfo.getstate()==downstate.pause||downinfo.getstate()==downstate.stop)return;
          downinfo.setstate(downstate.down);
          msubscriberonnextlistener.get().updateprogress(along,downinfo.getcountlength());
        }
      });
    }
  }

}

        9.下载管理类封装httpdownmanager

        单利获取

[Java] 查看源文件 复制代码
 /**
   * 获取单例
   * @return
   */
  public static httpdownmanager getinstance() {
    if (instance == null) {
      synchronized (httpdownmanager.class) {
        if (instance == null) {
          instance = new httpdownmanager();
        }
      }
    }
    return instance;
  }

        因为单利所以需要记录正在下载的数据和回到sub

[Java] 查看源文件 复制代码
 /*回调sub队列*/
  private hashmap<string,progressdownsubscriber> submap;
  /*单利对象*/
  private volatile static httpdownmanager instance;

  private httpdownmanager(){
    downinfos=new hashset<>();
    submap=new hashmap<>();
  }

        开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到responsebody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)

[Java] 查看源文件 复制代码
 /**
   * 开始下载
   */
  public void startdown(downinfo info){
    /*正在下载不处理*/
    if(info==null||submap.get(info.geturl())!=null){
      return;
    }
    /*添加回调处理类*/
    progressdownsubscriber subscriber=new progressdownsubscriber(info);
    /*记录回调sub*/
    submap.put(info.geturl(),subscriber);
    /*获取service,多次请求公用一个sercie*/
    httpservice httpservice;
    if(downinfos.contains(info)){
      httpservice=info.getservice();
    }else{
      downloadinterceptor interceptor = new downloadinterceptor(subscriber);
      okhttpclient.builder builder = new okhttpclient.builder();
      //手动创建一个okhttpclient并设置超时时间
      builder.connecttimeout(info.getconnectiontime(), timeunit.seconds);
      builder.addinterceptor(interceptor);

      retrofit retrofit = new retrofit.builder()
          .client(builder.build())
          .addconverterfactory(gsonconverterfactory.create())
          .addcalladapterfactory(rxjavacalladapterfactory.create())
          .baseurl(info.getbaseurl())
          .build();
      httpservice= retrofit.create(httpservice.class);
      info.setservice(httpservice);
    }
    /*得到rx对象-上一次下d的位置_始下d*/
    httpservice.download("bytes=" + info.getreadlength() + "-",info.geturl())
        /*指定线程*/
        .subscribeon(schedulers.io())
        .unsubscribeon(schedulers.io())
          /*失败后的retry配置*/
        .retrywhen(new retrywhennetworkexception())
        /*读取下载写入文件*/
        .map(new func1<responsebody, downinfo>() {
          @override
          public downinfo call(responsebody responsebody) {
            try {
              writecache(responsebody,new file(info.getsavepath()),info);
            } catch (ioexception e) {
              /*失败抛出异常*/
              throw new httptimeexception(e.getmessage());
            }
            return info;
          }
        })
        /*回调线程*/
        .observeon(androidschedulers.mainthread())
        /*数据回调*/
        .subscribe(subscriber);

  }

        写入文件

        注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次downinfo是否获取到下载总长度,没有这选择当前responsebody 读取长度为总长度

[Java] 查看源文件 复制代码
  /**
   * 写入文件
   * @param file
   * @param info
   * @throws ioexception
   */
  public void writecache(responsebody responsebody,file file,downinfo info) throws ioexception{
    if (!file.getparentfile().exists())
      file.getparentfile().mkdirs();
    long alllength;
    if (info.getcountlength()==0){
      alllength=responsebody.contentlength();
    }else{
      alllength=info.getcountlength();
    }
      filechannel channelout = null;
      randomaccessfile randomaccessfile = null;
      randomaccessfile = new randomaccessfile(file, "rwd");
      channelout = randomaccessfile.getchannel();
      mappedbytebuffer mappedbuffer = channelout.map(filechannel.mapmode.read_write,
          info.getreadlength(),alllength-info.getreadlength());
      byte[] buffer = new byte[1024*8];
      int len;
      int record = 0;
      while ((len = responsebody.bytestream().read(buffer)) != -1) {
        mappedbuffer.put(buffer, 0, len);
        record += len;
      }
      responsebody.bytestream().close();
        if (channelout != null) {
          channelout.close();
        }
        if (randomaccessfile != null) {
          randomaccessfile.close();
        }
  }

        停止下载

        调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)

[Java] 查看源文件 复制代码
/**
   * 停止下载
   */
  public void stopdown(downinfo info){
    if(info==null)return;
    info.setstate(downstate.stop);
    info.getlistener().onstop();
    if(submap.containskey(info.geturl())) {
      progressdownsubscriber subscriber=submap.get(info.geturl());
      subscriber.unsubscribe();
      submap.remove(info.geturl());
    }
    /*同步数据库*/
  }

        暂停下载

        原理和停止下载原理一样

[Java] 查看源文件 复制代码
 /**
   * 暂停下载
   * @param info
   */
  public void pause(downinfo info){
    if(info==null)return;
    info.setstate(downstate.pause);
    info.getlistener().onpuase();
    if(submap.containskey(info.geturl())){
      progressdownsubscriber subscriber=submap.get(info.geturl());
      subscriber.unsubscribe();
      submap.remove(info.geturl());
    }
    /*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/
  }

        暂停全部和停止全部下载任务

[Java] 查看源文件 复制代码
/**
   * 停止全部下载
   */
  public void stopalldown(){
    for (downinfo downinfo : downinfos) {
      stopdown(downinfo);
    }
    submap.clear();
    downinfos.clear();
  }

  /**
   * 暂停全部下载
   */
  public void pauseall(){
    for (downinfo downinfo : downinfos) {
      pause(downinfo);
    }
    submap.clear();
    downinfos.clear();
  }

        整合代码httpdownmanager

        同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)

        补充

        有同学说不知道数据库这块怎么替换,所以我加入了greendao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

        只需要替换dbutil的方法即可

        总结

        到此我们的rxjava+retrofit+okhttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

         1.retrofit+rxjava+okhttp基本使用方法
2.统一处理请求数据格式
3.统一的progressdialog和回调subscriber处理
4.取消http请求
5.预处理http请求
6.返回数据的统一判断
7.失败后的retry封装处理
8.rxlifecycle管理生命周期,防止泄露
9.文件上传和文件下载(支持多文件断点续传)

        源码:传送门-全部封装源码



        

0

主题

9179

帖子

2379

安币

Android大神

Rank: 6Rank: 6

发表于 2018-3-15 21:07:42 | 显示全部楼层
支持,感谢,祝巴士越来越好~

3

主题

9472

帖子

1796

安币

Android大神

Rank: 6Rank: 6

QQ达人

发表于 2018-3-17 05:42:47 | 显示全部楼层
支持,感谢,祝巴士越来越好~

501

主题

1205

帖子

2041

安币

手工艺人

发表于 2018-3-18 04:52:57 | 显示全部楼层
感谢大神~

0

主题

9859

帖子

1695

安币

Android大神

IT

Rank: 6Rank: 6

发表于 2018-3-18 19:24:40 | 显示全部楼层
支持楼主,支持安卓巴士!

0

主题

5

帖子

19

安币

初级码农

Rank: 1

发表于 2018-3-21 17:22:09 | 显示全部楼层
支持学习学习!

0

主题

86

帖子

174

安币

程序猿

Rank: 2

发表于 2018-4-3 16:49:59 | 显示全部楼层
感谢分享,安卓巴士有你更精彩:)

0

主题

27

帖子

211

安币

攻城狮

Rank: 3Rank: 3

发表于 2018-4-12 16:50:22 | 显示全部楼层
感谢分享,安卓巴士有你更精彩:)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

领先的中文移动开发者社区
18620764416
7*24全天服务
意见反馈:1294855032@qq.com

扫一扫关注我们

Powered by Discuz! X3.2© 2001-2019 Comsenz Inc.( 粤ICP备15117877号 )