• 0

  • 485

什么,还有这么简单的OkHttp源码分析?

黑猫

我不是黑客

3星期前

前言

现在Android界的网络请求已经是OkHttp和Retrofit的天下了,Retrofit本质上也是将请求委托给了OkHttp,所以我们如果想要能够更加全面的使用和掌握OkHttp,了解其源码是必不可少的。 如果对Http还有不了解的小伙伴,可以看我上一篇文章,带你全面掌握Http协议 面试官的这份HTTP灵魂追问你Hold住吗? 来吧,出发~

简单使用

同步方式和异步方式殊途同归,而且异步方式更多了异步和线程的概念,所以本文将直接分析异步,同步的话流程也是一样的。

val client=OkHttpClient().newBuilder().build()
val request=Request.Builder().url("https://www.baidu.com").build()
val call = client.newCall(request)
call.enqueue(object :Callback{
      override fun onFailure(call: Call, e: IOException) {
          println("onFailure:$e")
      }

      override fun onResponse(call: Call, response: Response) {
          println("onResponse:${response.body?.string()}")
      }

})
复制代码

这可以说是最简单的使用方式了。

使用 OkHttp 基本是以下四步:

  1. 创建 OkHttpClient 对象
  2. 创建Request请求对象
  3. 创建Call对象
  4. 同步请求调用call.execute();异步请求调用call.enqueue(callback)

这都是啥?一个简单的请求为啥需要这么多对象呢?

简单架构

那好,我们来分析一下他们的职责,其实啊一次网络请求和寄快递没啥区别。

首先,你在淘宝买了一个东西,发现不满意,要换货,你得去快递点(OkHttpClient ),然后填写快递单(Request)表明要寄给谁,寄的啥东西,填完以后你交个快递小哥,小哥把你这个快递给你寄出去(Call)。如果是同步请求(call.execute)的话,你就在这等着快递寄出去,然后卖家再发给你换好的货(Response)。显然,这么等的话怕是等个三四天,所以我们把快递给了快递小哥以后,告诉小哥我的电话(CallBack回调),说货到了通知我(触发回调 onResponse/onFailure)。

这样一来是不是很简单了

什么?还不够清晰明了?靠,我上一张图,不怕你不知道

好,到此结束,OKHTTP分析完毕!

什么?这就完事了?我不是听说还有啥拦截器,什么责任链,什么缓存balabla的,你这叫我和面试官如何对线?

好吧,那我们接着下面看OkHttp做了什么。

构建OkHttpClient对象

我们一步一步来,先分析我们如何鬼使神差的走进快递点的,啊不,构建OkHttpClient对象的

val client=OkHttpClient().newBuilder().build()
复制代码

这行代码是构建OkHttpClient对象的,很明显有个Builder对象,这明眼人一看就知道是构建者模式去构建OkHTTPClient对象。

什么?构建者模式是什么

构建者模式一般适用于创建比较复杂的对象使用的,可以屏蔽用户对细节的感知balabala

我最讨厌说定义了,我这么说吧,你如果创建一个对象构造函数中参数特别多的时候,大部分情况下你用构建者就行了。

为什么?就比如说你想整个车,你可以直接说,来奔驰,给我整俩车。你对如何造车完全不知道,但这并不影响你提车泡妹。

可是你不是说参数比较多的时候用构建者么?

没错,参数比较多的时候你是不是得一个个传参,还得小心参数的位置啥的?当然了,在Kotlin中可以指定哪个参数,它本质上和构建者也差不多。你也不想提车的时候还得小心翼翼的和厂家一一说明要求吧,我得有个四个轮的车,白色的,能坐四个人的等等要求。

所以构建者模式就是为了解决此问题,你可以没有要求,我就使用默认值,你可以有很多要求,甚至可以重复,反正我只在你提车的时候最终确认即可给你符合你要求的车。

所以如果使用构建者模式的话,我们不必为参数的位置和忘了传什么参数而烦忧,我们可以写出如下代码:

val client=OkHttpClient()
    .newBuilder()
    .dispatcher()
    .callTimeout()
    .authenticator()
    .addInterceptor()
    .cache()
    .build()
复制代码

你看你可以使用构建者模式去创建出符合你所有要求的对象,不用担心参数位置,或者忘记传哪个参数。

当然,本篇不是讲设计模式的,随后我将会开辟一个新的系列,专门讲设计模式。这里点到为止。

我们继续回到我们的OkHttp来,使用Builder对象创建了OkHttpclient对象过程中,其实给OkHttpclient创建了一系列的配置,我们这里简要的看一下,在newBuilder方法中创建了一个Builder对象:

class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()//调度器,一会讲
    internal val interceptors: MutableList<Interceptor> = mutableListOf()//拦截器,一会也会讲
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()//网络拦截器,也会讲
    ...//定义很多其他对象
}
复制代码

在这里省略了很多对象,只列出了三个比较重要的对象,一会都会讲的。

build方法真正构建了OkHttpclient对象:

fun build(): OkHttpClient = OkHttpClient(this)
复制代码

这个this是什么?

这个build方法是Builder对象的build方法,this当然是Builder对象啦。这里真正创建了一个OkHTTPClient对象,并把Builder对象自身传了过去。在OkHttpclient对象的构造方法中有如下代码:

open class OkHttpClient internal constructor(
  builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
  @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher //调度器
  @get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()//拦截器集合
  @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()//网络拦截器集合
}
复制代码

可以看到,最终builder对象中的拦截器全部给了OkHttpClient对象,这也正是构建者模式的体现之处。

简而言之,就是OkHttpClient里面定义了很多我们不知道的对象。正如你寄快递的时候,你并不知道快递是如何寄出去的,OkHttp帮你完成了很多的细节工作。

接下来就是构建Request对象了

构建Request对象

你想寄快递,必须填写快递单,写你要寄给谁,寄到哪里,怎么寄平邮还是快件,这一切就是Request对象的职责。它负责记录你的请求信息。

val request=Request.Builder().url("https://www.baidu.com").build()
复制代码

经过上面的分析,我们已经知道,构建者Builder中所有的参数最终全部会给到Request对象中。所以我们直接看看Request对象的构造方法中有什么东西~

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,//ur,也就是你要发送的地方
  @get:JvmName("method") val method: String, //请求方法,GET/POST
  @get:JvmName("headers") val headers: Headers,//请求头,一些备注信息或者请求信息
  @get:JvmName("body") val body: RequestBody?,//请求的内容,就是你具体要发送的快递不能
  internal val tags: Map<Class<*>, Any>
) {
    ....
}
复制代码

注释已经讲的很清楚了,关于HTTP网络这块,大家不太懂的可以看我之前的文章,从HTTP表层到原理全部都有讲解。

接下来也没啥内容了,该轮到Call对象了

创建Call对象

val call = client.newCall(request)
复制代码

正如你想寄快递,你必须填写好快递单交给快递站的小哥才行。

这里你已经填好了快递单,你需要交给快递站,然后快递站派出快递小哥帮你发快递。

快递小哥就是Call,我们看看是Call是什么东东:

interface Call : Cloneable {

  fun request(): Request//返回请求信息,就是返回你填写的快递单
  @Throws(IOException::class)
  fun execute(): Response//同步请求,就是你在快递站傻等~
  fun enqueue(responseCallback: Callback)//异步请求,接受回调参数。就是你告诉快递小哥你电话,让小哥寄快递
  fun cancel()//取消请求,就是你取消寄快递
  fun isExecuted(): Boolean//是否正在请求,就是你的快递寄出去了没,寄到哪了
  fun isCanceled(): Boolean//是否是取消状态
  fun timeout(): Timeout //超时时间,就是你这个快递寄出去半年了都没动静,你就认为中途丢了
  public override fun clone(): Call
  fun interface Factory {
    fun newCall(request: Request): Call
  }
}
复制代码

Call本质上是个接口,也符合我们的面向接口编程。它里面定义了很多方法,在上面注释中我也都详细阐述了。可是接口并不能具体帮我们干活啊,就像你知道你要找一个快递小哥,快递小哥是一个抽象的概念并不是一个具体存在的人,你只知道快递小哥能帮你寄快递,这个寄快递就是一个抽象的方法。所以我们需要知道到底是哪个快递小哥帮我们寄了快递。

我们看一下这个newCall方法:

//OkHttpClient.kt
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
复制代码

啊知道了,RealCall才是真正为我们服务的那个快递小哥。创建RealCall的过程中,我们传进去一共三个参数,第一个this自然就是OkHttpclient对象本身了,第二个参数是请求信息也就是快递单,第三个参数是用于指明不是webSocket请求。暂时不用管第三个参数

好,我们现在万事俱备只欠东风,所有的对象都已经创建完成了,可以寄快递了吧?

发送请求

发送请求就是将你的电话号告诉给快递小哥,快递小哥寄出去以后,收到快递再给你打电话告诉你快递到了或者丢了。对应的就是以下代码。

call.enqueue(object :Callback{
    override fun onFailure(call: Call, e: IOException) {
        println("onFailure:$e")
    }
    override fun onResponse(call: Call, response: Response) {
        println("onResponse:${response.body?.string()}")
    }
})
复制代码

前面也说了call.enqueue是一个异步方法,我们既然是从源码开始学习的,自然不能只停留在表面,当然要深入源码去探索,看看这个发送的过程到底是怎么样的过程。在上面我们知道,这个快递小哥是一个宽泛的概念(接口),真正执行任务的时候要具体到某个人的(实现类),这个具体的人就是RealCall对象,所以我们看看RealCall.enqueue方法。

RealCall.enqueue

//RealCall.kt
override fun enqueue(responseCallback: Callback) {
  //检查是否执行了两遍,也就是说你是不是同一个快递想要发两次,这当然是不行的。
  check(executed.compareAndSet(false, true)) { "Already Executed" }
  //请求后会立即调用,相当于监听请求的开始事件
  callStart()
  //将请求交给调度器来决定什么时候请求
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}
复制代码

这个方法很有意思,前面两行注释已经有了,就不讲了,最后一行首先创建了一个AsyncCall对象,并且把回调给了AsyncCall。这里先不研究AsyncCall,一会再说。接下来调用了dispatcher.enqueue方法,什么时候发送和发送的过程交给了dispatcher(调度器)。由它来决定何时,如何发送。

不太明白?

dispatcher是调度员的意思,在你寄快递的例子中就是本地物流中心,它负责决定整个收发快递的流程,比如发送的策略,是否发送等等。

我们看一下定义

Dispatcher

class Dispatcher constructor() {
 //最大请求数,也就是最多只有64辆发送快递的车,用完了就没了
  @get:Synchronized var maxRequests = 64  
    //同一主机最大请求数量,加入你退货的目的地是北京,发往北京的快递车最多只有5辆
  @get:Synchronized var maxRequestsPerHost = 5

    //线程安全的单例模式,线程池的获取用于线程调度。
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
}

 //准备的队列,也就是进入了打包的车
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  //异步发送队列,也就是进入了准备出发的车
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()
  //同步发送队列,同样是进入了准备出发的车
  private val runningSyncCalls = ArrayDeque<RealCall>()
    ...
}
复制代码

这个Dispatcher有上面这么几个比较重要的字段,通过上面的注释可以得到,Dispatcher对象定义了最大请求数和同一主机的最大请求数。。然后三个队列,我们只研究准备队列和异步发送队列,因为同步发送队列是一样的道理。

我们知道了这个Dispatcher对象的几个字段之后,我们接着看它的enqueue方法

Dispatcher.enqueue

//Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
  //确保线程安全
  synchronized(this) {
    //把快递放到打包车中
    readyAsyncCalls.add(call)
    //如果不是websocket,前面可知forWebSocket为false
    if (!call.call.forWebSocket) {
      //查询有没有退货到北京的车
      val existingCall = findExistingCallWithHost(call.host)
      //复用退货到北京的计数器callsPerHost,用于统计发往北京快递车数量
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  //准备发送快递
  promoteAndExecute()
}
复制代码

首先是放进准备队列中,也就是把快递放进了打包的车中。然后先看看有没有到北京的快递车,有的话直接用它的计数器,用来统计现在有几辆发往北京的快递车了,也就是检查同一个主机已经有几个请求了。然后准备发送

Dispatcher.promoteAndExecute

private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()
  //收集所有要发出去的快递,也就是要执行的请求
  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()//得到迭代器对象用于遍历
    while (i.hasNext()) {//遍历
      val asyncCall = i.next()//得到下一个快递
      //很重要的判断,如果64个快递车都出发了,也就没快递车了,那么将无法请求
      if (runningAsyncCalls.size >= this.maxRequests) break 
      //发往北京的5辆快递车也都走了,那么继续下一个快递
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
      //上面都通过了,说明有车,并且发往北京的快递车也还有,那么就离开打包车
      i.remove()
      //把去往北京的快递车数量+1
      asyncCall.callsPerHost.incrementAndGet()
      //把快递保存起来
      executableCalls.add(asyncCall)
      //将快递放到即将出发的车中
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }
  //循环遍历,将所有的快递寄出去
  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService)
  }
  return isRunning
}
复制代码

上面的代码很长,但其实只做了这么三件事

  • 先判断还有没有快递车了,没有车的话什么快递都寄不出去了,跳出循环
  • 在判断同一目的地的车辆还有没有了,目的地如果是北京,那就是判断去北京的车还有没有了,没有的话就看下一个快递。如果还有车的话,就把它装进即将出发的车里,然后计数+1,表示又占用了一辆去北京的车
  • 再次循环,将所有要寄出去的快递寄出去

最终啊快递出发还是回到了AsyncCall的executeOn方法,我们先看看AsyncCall对象是什么

internal inner class AsyncCall(
  private val responseCallback: Callback
) : Runnable {
}
复制代码

其它的我们都不需要关注,我们只需要关注两点,第一它接受了回调,也就是你的电话号。第二,它继承了Runnable,这是什么?这不就是线程么,创建Thread,实现Runnable接口。没错就是这样喵

上面说到,最终发送是调用了asyncCall.executeOn(executorService)方法,那我们具体看一下这个方法

asyncCall.executeOn

fun executeOn(executorService: ExecutorService) {
  client.dispatcher.assertThreadDoesntHoldLock()
  //暂时定义发送没成功
  var success = false
  try {
    //使用线程池来执行自身
    executorService.execute(this)
    //发送成功了
    success = true
  } catch (e: RejectedExecutionException) {
    val ioException = InterruptedIOException("executor rejected")
    ioException.initCause(e)
    noMoreExchanges(ioException)
    //失败回调
    responseCallback.onFailure(this@RealCall, ioException)
  } finally {
    if (!success) {
      //结束
      client.dispatcher.finished(this) 
    }
  }
}
复制代码

别看代码很长,其实就是做了一件事,这个AsyncCall把自己加入到了线程池,然后由线程池来开启线程执行自己的run方法。

小伙伴会问了,什么是线程池?

线程池可有讲究了,咱们本篇研究的是OkHttp,这里就不讨论线程池了。以后的文章中会专门研究线程的种种,包括线程池。这里咱们只研究一点,就是这是一个什么样的线程池

还记得刚才Dispatcher类中定义了单例并且线程安全的线程池么

@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      //定义了一个缓存线程池
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
}
复制代码

这里就是定义了一个缓存的线程池,什么是缓存线程池?就是具有缓存功能的,如果一个线程工作完了并且60s之内又有请求过来,就复用刚才那个线程,这提高了性能。

然后交由线程池之后,线程池会开启线程执行asyncCall的run方法

asyncCall.run

override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
      //定义响应标志位,用于表示请求是否成功
      var signalledCallback = false
      timeout.enter()
      try {
        //寄快递过程中~~~得到了换回来的货物
        val response = getResponseWithInterceptorChain()
        //走到这步没出错,代表寄快递成功了  
        signalledCallback = true
        //打电话给你表示货到了  
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        //如果寄快递过程没出错,但是try/catch还是报异常了只能说 onResponse中出错了,这就是用户的锅
        if (signalledCallback) {
          Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
        } else {
          //失败回调  
          responseCallback.onFailure(this@RealCall, e)
        }
      } catch (t: Throwable) {
        cancel()//取消请求
        //okhttp自己的锅,发送过程中出现了错误  
        if (!signalledCallback) {
          val canceledException = IOException("canceled due to $t")
          canceledException.addSuppressed(t)
          //失败回调  
          responseCallback.onFailure(this@RealCall, canceledException)
        }
        throw t
      } finally {
        //结束  
        client.dispatcher.finished(this)
      }
    }
  }
}
复制代码

走到这一步不容易,核心流程算是走完了。最终是通过getResponseWithInterceptorChain方法发出去快递并且得到换回来的货。

看懵了?不要紧,我们来梳理一下流程

我要放大招了!

流程如下:

  1. 进入快递点,对应创建okhttpClient对象

  2. 填写快递单,对应创建Request对象

  3. 将快递单交给快递小哥,对应创建RealCall对象并将Request对象交给它

  4. 让小哥寄快递并告诉自己的手机号,对应调用Call的enqueue方法,并传递回调

  5. 小哥把快递交给了调度员,并且把电话号告诉了开车的老司机,然后调度员把快递放到了打包车中,对应dispatcher的enqueue方法和

  6. 调度员询问还有没有车了,以及有没有北京的车了两个判断。对应dispatcher的promoteAndExecute方法

  7. 答案是有车的话,就把老司机叫过来,给他分配一辆车,让他开车去把快递送出去。对应于线程池分配一个线程去执行AsyncCall的run方法

  8. 老司机开车一路上会遇到安检,收费站等等地方,对应于拦截器链。

  9. 老司机不管是送到了以后拿到了要换的货还是没送到,老司机都会回来,并且照着快递小哥给的电话给你打电话告诉你结果。对应于回调方法的触发

这边是主线流程,是不是相当的简单呢?和我们寄快递没有一点点差别,所以说啊,代码来源于生活,又抽象与生活。

相信有小伙伴肯定不满足于如此简单的主线流程,那么我们就进入OkHttp的支线剧情,看看拦截器链具体做了什么

前方高能!!

前方高能!!

前方高能!!

如果你只是想大概关注一下,可以直接拉到后面看结论,前面将要关门放狗,啊不,放源码了!

拦截器链

拦截器嘛,顾名思义就是拦截你的。

比如安检就是拦截你的,老司机开车进出北京市,那肯定要过安检的,毕竟安全为重。上路以后还得过收费站,中间在经过休息区等等。所以拦截器链的作用就是对请求做各种各样的操作的。

拦截器链采用的是责任链模式。看到了吗,一个设计优秀的框架必然会使用很多的经典的设计模式来保证代码的健壮性。所以我们也应该学习这种设计模式,不过今天就不学了,可以关注我,随后我会发布设计模式专栏。

在上面的代码介绍中,我们最终是走到了getResponseWithInterceptorChain方法,通过这个方法得到了reponse响应,也就是我要换的货。

我们就从这个方法开始看 ,究竟什么是责任链,什么是拦截器

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
  // 构建完整的拦截器栈。
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors //用户自定义的拦截器
  //添加默认拦截器
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    //添加用户定义的网络拦截器
    interceptors += client.networkInterceptors
  }
  //默认的拦截器
  interceptors += CallServerInterceptor(forWebSocket)
  //定义真正的拦截器链   
  val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      index = 0,
      exchange = null,
      request = originalRequest,
      connectTimeoutMillis = client.connectTimeoutMillis,
      readTimeoutMillis = client.readTimeoutMillis,
      writeTimeoutMillis = client.writeTimeoutMillis
  )
  var calledNoMoreExchanges = false
  try {
    //责任链的精髓所在,这一行代码就开启责任链的传递,然后责任链中的所有拦截器层层调用,然后层层返回
    val response = chain.proceed(originalRequest)
    //这一行代码是责任链全部执行完毕且不会出错才会执行到这里
    if (isCanceled()) {
      response.closeQuietly()
      throw IOException("Canceled")
    }
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      noMoreExchanges(null)
    }
  }
}
复制代码

如果自己不定义拦截器的话,OkHttp默认的是五个拦截器。

可能有的小伙伴不太明白责任链到底是怎么运作的

很简单,当调用了chain.proceed方法以后即进入下一个拦截器的方法,下一个拦截器依然调用chain.proceed方法,重复不断的进入下一个拦截器。这就是请求层层被拦截,响应传回来的时候会被最后一个拦截器拦截,然后在层层的传回来。

如图所示

这里暂不讨论用户自定义的拦截器,我们看下默认的五个拦截器分别是做什么的

这是五个拦截器的作用。我们一个个来看

RetryAndFollowUpInterceptor

总的来说RetryAndFollowUpInterceptor就是负责重试和请求重定向的一个拦截器,它还额外做了一个工作就是创建了一个ExchangeFinder对象,这个对象就是用来管理连接池为后来的连接做准备的。

class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {

  companion object {
    //最大重试次数或者重定向次数
    private const val MAX_FOLLOW_UPS = 20
  }
    
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    ...
    while (true) {
      //创建了ExchangeFinder对象,这个对象用于管理连接池以及
      call.enterNetworkInterceptorExchange(request, newExchangeFinder)
      var response: Response
      var closeActiveExchange = true
      try {
        ...
        try {
          //让下一个拦截器来处理
          response = realChain.proceed(request)
          newExchangeFinder = true
        } catch (e: RouteException) {
          ...
        } catch (e: IOException) {
          ...
        }
        ...
        //后面的拦截器执行完了,拿到Response,解析看下是否需要重试或重定向,需要则返回新的Request
        val followUp = followUpRequest(response, exchange)
        //新的Request为空,直接返回response
        if (followUp == null) {
          if (exchange != null && exchange.isDuplex) {
            call.timeoutEarlyExit()
          }
          closeActiveExchange = false
          return response
        }
        val followUpBody = followUp.body
        //如果RequestBody有值且只许被调用一次,直接返回response  
        if (followUpBody != null && followUpBody.isOneShot()) {
          closeActiveExchange = false
          return response
        }
        response.body?.closeQuietly()
        //超过重试最大次数抛出异常
        if (++followUpCount > MAX_FOLLOW_UPS) {
          throw ProtocolException("Too many follow-up requests: $followUpCount")
        }
        //将新的请求赋值给request,继续循环
        request = followUp
        priorResponse = response
      } finally {
        call.exitNetworkInterceptorExchange(closeActiveExchange)
      }
    }
  }
 } 
复制代码

BridgeInterceptor

桥接,负责把应用请求转换成网络请求,把网络响应转换成应用响应,就是添加各种响应头信息的。负责把用户构造的请求转换为发送给服务器的请求,把服务器返回的响应转换为对用户友好的响应。 在Request阶段配置用户信息,并添加一些请求头。在Response阶段,进行gzip解压。

有以下几点需要注意:

  • 开发者没有添加Accept-Encoding时,自动添加Accept-Encoding: gzip
  • 自动添加Accept-Encoding,会对request,response进行自动解压
  • 手动添加Accept-Encoding,不负责解压缩
  • 自动解压时移除Content-Length,所以上层Java代码想要contentLength时为-1
  • 自动解压时移除 Content-Encoding
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
  ...
  val body = userRequest.body
  if (body != null) {
  //添加各种请求头信息
  requestBuilder.header("Content-Type", contentType.toString())
  requestBuilder.header("Content-Length", contentLength.toString())
  requestBuilder.header("Host", userRequest.url.toHostHeader())
  requestBuilder.header("Connection", "Keep-Alive")
  requestBuilder.header("Accept-Encoding", "gzip")
  requestBuilder.header("Cookie", cookieHeader(cookies))
  //让下一个拦截器来做操作
  val networkResponse = chain.proceed(requestBuilder.build())
  cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
  
  val responseBuilder = networkResponse.newBuilder()
      .request(userRequest)
  //这里有个坑:如果你在请求的时候主动添加了"Accept-Encoding: gzip" ,transparentGzip=false,那你就要自己解压
  if (transparentGzip &&
      "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
      networkResponse.promisesBody()) {
    val responseBody = networkResponse.body
    if (responseBody != null) {
      val gzipSource = GzipSource(responseBody.source())
      val strippedHeaders = networkResponse.headers.newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build()
      responseBuilder.headers(strippedHeaders)
      val contentType = networkResponse.header("Content-Type")
      responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
    }
  }
  return responseBuilder.build()
}
复制代码

CacheInterceptor

我们知道为了节省流量和提高响应速度,Okhttp是有自己的一套缓存机制的,CacheInterceptor就是用来负责读取缓存以及更新缓存的。它内部的实现是使用的OKIO是进行读取和写入的。

@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
  //读取候选缓存
  val cacheCandidate = cache?.get(chain.request())
  //定义了缓存的策略
  val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
  //根据策略,不使用网络,又没有缓存的直接报错,并返回错误码504。
  if (networkRequest == null && cacheResponse == null) {
        return ...
  }
  // 根据策略,不使用网络,有缓存的直接返回。
  if (networkRequest == null) {
    return ...
  }
  var networkResponse: Response? = null
  try {
    //让下一个拦截器进行工作  
    networkResponse = chain.proceed(networkRequest)
  } finally {
    ...   
  }
  //  接收到网络结果,如果响应code式304,则使用缓存,返回缓存结果
  if (cacheResponse != null) {
    if (networkResponse?.code == HTTP_NOT_MODIFIED) {
      ...
      return response.also {listener.cacheHit(call, it)}
    } else {
      cacheResponse.body?.closeQuietly()
    }
  }
  //7. 读取网络结果。
  val response = networkResponse!!.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build()
   //8. 对数据进行缓存。
  if (cache != null) {
     //省略判断,这里使用了OKio来进行缓存的写入 
     return cacheWritingResponse(cacheRequest, response)
  }
  //返回响应结果  
  return response
}
复制代码

整个方法的流程如下所示:

  1. 读取候选缓存,
  2. 创建缓存策略,强制缓存、对比缓存等
  3. 根据策略,不使用网络,又没有缓存的直接报错,并返回错误码504。
  4. 根据策略,不使用网络,有缓存的直接返回。
  5. 前面两个都没有返回,继续执行下一个Interceptor,即ConnectInterceptor。
  6. 接收到网络结果,如果响应code式304,则使用缓存,返回缓存结果。
  7. 读取网络结果。
  8. 对数据进行缓存。
  9. 返回网络读取的结果。

关于缓存方面,一会放在后面专门讲缓存

ConnectInterceptor

顾名思义,这个拦截器就是用来建立连接的。

还记得在之前的RetryAndFollowUpInterceptor中定义的ExchangeFinder对象吗,它里面包含了一个连接池,用于在连接池中取得连接对象。

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //开始连接工作
    val exchange = realChain.call.initExchange(chain)
    //将exchange对象传递到下一个拦截器中
    val connectedChain = realChain.copy(exchange = exchange)
    //进行下一个拦截器的工作  
    return connectedChain.proceed(realChain.request)
  }
}
复制代码

这个拦截器里面的代码很少,但是工作作了不少。它真正的建立了连接。并且将exchange对象传递给下一个拦截器,这个对象的作用是什么呢?

exchange对象主要就是用来发送HTTP请求和接受响应的。

不妨我们看看连接是如何建立的?如果你并不想关注连接是如何建立的,可以跳过这一部分,直接看结果

RealCall.initExchange

internal fun initExchange(chain: RealInterceptorChain): Exchange {
  ...
  val exchangeFinder = this.exchangeFinder!! //在RetryAndFollowUpInterceptor定义的对象,里面有个连接池
  val codec = exchangeFinder.find(client, chain)//查找连接,返回的是ExchangeCodec对象
  val result = Exchange(this, eventListener, exchangeFinder, codec)//创建Exchange用来发送和接受请求
  this.interceptorScopedExchange = result
  this.exchange = result
  ...
  return result
}
复制代码

ExchangeFinder.find

fun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {
  try {
    //查找可用的连接  
    val resultConnection = findHealthyConnection(
       ...
    )
    //创建新的ExchangeCodec对象
    return resultConnection.newCodec(client, chain)
  } catch (e: RouteException) {
    ...
  }
}
复制代码

ExchangeFinder.findHealthyConnection

@Throws(IOException::class)
private fun findHealthyConnection(connectTimeout: Int,readTimeout: Int,
                                  writeTimeout: Int,pingIntervalMillis: Int,
                                  connectionRetryEnabled: Boolean,
                                  doExtensiveHealthChecks: Boolean):RealConnection {
  while (true) {
    //查找连接  
    val candidate = findConnection(
        ...
    )
    //确认连接是正常的
    if (candidate.isHealthy(doExtensiveHealthChecks)) {
      return candidate
    }
    //如果不正常则从连接池取出
    candidate.noNewExchanges()
    ...
  }
}
复制代码

ExchangeFinder.findConnection

@Throws(IOException::class)
private fun findConnection(
  connectTimeout: Int,
  readTimeout: Int,
  writeTimeout: Int,
  pingIntervalMillis: Int,
  connectionRetryEnabled: Boolean
): RealConnection {
  if (call.isCanceled()) throw IOException("Canceled")

  // 尝试重用连接
  val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
  if (callConnection != null) {
    ...
  }
  // 尝试从池中获取连接
  if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
    ...
  }
  ...
  // 创建一个新的连接
  val newConnection = RealConnection(connectionPool, route)
  call.connectionToCancel = newConnection
  try {
    //连接服务器  
    newConnection.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
  } finally {
    call.connectionToCancel = null
  }
  ...
  synchronized(newConnection) {
    //添加到连接池中  
    connectionPool.put(newConnection)
    call.acquireConnectionNoEvents(newConnection)
  }
  return newConnection
}
复制代码

工作很简单,就是先尝试重用连接,如果没法重用就从连接池中取一个连接,如果没取到就创建新的连接。

可以看到,链接对象最终是RealConnection。,并且调用了RealConnection.connect方法来进行连接。并且经过调用,最后到了RealConnection.connectSocket方法。

RealConnection.connectSocket

@Throws(IOException::class)
private fun connectSocket(
  connectTimeout: Int,
  readTimeout: Int,
  call: Call,
  eventListener: EventListener
) {
  ...
  //创建Socket对象
  val rawSocket = when (proxy.type()) {
    Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
    else -> Socket(proxy)
  }
  this.rawSocket = rawSocket
  try {
    //真正进行Socket连接  
    Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
  } catch (e: ConnectException) {
    throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
      initCause(e)
    }
  }
  try {
    //okio中的接口,用来输入,类似于  InputStream
    source = rawSocket.source().buffer()
    //okio中的接口 ,用来输出,类似于OutputStream 
    sink = rawSocket.sink().buffer()
  } catch (npe: NullPointerException) {
    if (npe.message == NPE_THROW_WITH_NULL) {
      throw IOException(npe)
    }
  }
}
复制代码

这个方法的内容也很简单,我做了部分删减。主要就是创建了Socket对象,并且使用Socket对象建立了连接,然后使用OKio中的接口获得输入/输出流。

ConnectInterceptor小结

ConnectInterceptor是一个特别重要的拦截器,在这个拦截器中真正的建立了连接,并且获得了输入输出流,为将来的输入输出进行了准备。

总的来说就是做了这么几个工作:

  1. 首先查找是否有可用的连接,没有的话就尝试是否有能重用的连接,没有的话就去连接池中找,连接池中也没有就创建新的连接对象RealConnection
  2. 然后调用连接对象RealConnection的connect方法,最终创建了Socket对象用于真正的连接,然后使用了OkIO的的输入输出流,为输入输出做准备
  3. 最终返回Exchange对象,它是负责发送请求和接受响应的,而真正具体干活的是ExchangeCodec对象。
  4. 将Exchange对象放到拦截链中,让下一个拦截器进行真正的请求和响应

CallServerInterceptor

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response { 
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body  
    //写入请求头,如果是GET那么请求已经结束
    exchange.writeRequestHeaders(request)
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
        //如果熟悉HTTP的小伙伴应该知道POST请求会发送两个包,先发送请求头,获得相应为100后再发送请求体。
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
    }
    //写入请求体
    requestBody.writeTo(bufferedRequestBody)
    //读取响应头  
    responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    response = if (forWebSocket && code == 101) {
      ...
    } else {
      //读取响应体  
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    ...
    return response
  }
}
复制代码

为了便于观看,我删除了一些判断逻辑,但是不影响整体流程

流程大致为

  1. 先写入请求头,如果是GET请求的话就已经请求完毕,POST请求的话是先发送请求头再发送请求体,会发送两个TCP包
  2. 然后读取响应头,接着判断过后,读取响应体。
  3. 最终将响应的结果返回,这个结果会层层的向上传递,经过上面所有的拦截器。
  4. 最终走到了我们自定义的回调处。

拦截器总结

拦截器采用了责任链模式,层层向下请求,请求后的结果层层向上传递

  • 首先经过了RetryAndFollowUpInterceptor拦截器,这个拦截器负责重试和重定向,最大重试次数为20次。并且在这个对象中创建了ExchangeFinder对象,用于管理连接池等,为随后的链接做准备
  • 经过BridgeInterceptor拦截器,这个拦截器主要就是帮我们添加一些请求头的和压缩/解压缩的。在这个拦截器中表明了,如果用户自定义了gzip请求头,需要自行解压缩,OkHttp则不再负责解压缩
  • CacheInterceptor是负责缓存的,并且内部使用的是OKio进行的缓存,缓存策略下面会有讲。
  • ConnectInterceptor拦截器是负责建立连接的,最终是通过RealConnection对象建立的Socket连接,并且获得了输入输出流为下一步读写做准备。RealConnection对象的获取是优先复用的,没有复用的就从连接池里取,连接池也没的话在创建新的,并加入连接池
  • CallServerInterceptor拦截器就是最终的拦截器了,它将负责数据真正的读取和写入。

OkHttp缓存策略

首先学习一下HTTP缓存相关的理论知识,是实现Okhttp机制的基础。

HTTP的缓存机制也是依赖于请求和响应header里的参数类实现的。缓存分为两种,一种是强制缓存,一种是对比缓存

强制缓存:

客户端先看有没有缓存,有缓存直接拿缓存,如果没缓存的话就请求服务器然后将结果缓存,以备下次请求。

强制缓存使用的的两个标识:

  • Expires:Expires的值为服务端返回的到期时间,即下一次请求时,请求时间小于服务端返回的到期时间,直接使用缓存数据。到期时间是服务端生成的,客户端和服务端的时间可能有误差。
  • Cache-Control:Expires有个时间校验的问题,所有HTTP1.1采用Cache-Control替代Expires。

Cache-Control的取值有以下几种:

  • private: 客户端可以缓存。
  • public: 客户端和代理服务器都可缓存。
  • max-age=xxx: 缓存的内容将在 xxx 秒后失效
  • no-cache: 需要使用对比缓存来验证缓存数据。
  • no-store: 所有内容都不会缓存,强制缓存,对比缓存都不会触发。

对比缓存:

对比缓存需要服务端参与判断是否继续使用缓存,当客户端第一次请求数据时,服务端会将缓存标识(Last-Modified/If-Modified-Since与Etag/If-None-Match)与数据一起返回给客户端,客户端将两者都备份到缓存中 ,再次请求数据时,客户端将上次备份的缓存 标识发送给服务端,服务端根据缓存标识进行判断,如果返回304,则表示通知客户端可以继续使用缓存。

Last-Modified/If-Modified-Since

Last-Modified 表示资源上次修改的时间。

当客户端发送第一次请求时,服务端返回资源上次修改的时间:

Last-Modified: Tue, 12 Jan 2016 09:31:27 GMT
复制代码

客户端再次发送,会在header里携带If-Modified-Since。将上次服务端返回的资源时间上传给服务端。

If-Modified-Since: Tue, 12 Jan 2016 09:31:27 GMT
复制代码

服务端接收到客户端发来的资源修改时间,与自己当前的资源修改时间进行对比,如果自己的资源修改时间大于客户端发来的资源修改时间,则说明资源做过修改, 则返回200表示需要重新请求资源,否则返回304表示资源没有被修改,可以继续使用缓存。

上面是一种时间戳标记资源是否修改的方法,还有一种资源标识码ETag的方式来标记是否修改,如果标识码发生改变,则说明资源已经被修改,ETag优先级高于Last-Modified。

Etag/If-None-Match

ETag是资源文件的一种标识码,当客户端发送第一次请求时,服务端会返回当前资源的标识码:

ETag: "5694c7ef-24dc"
复制代码

客户端再次发送,会在header里携带上次服务端返回的资源标识码:

If-None-Match:"5694c7ef-24dc"
复制代码

服务端接收到客户端发来的资源标识码,则会与自己当前的资源吗进行比较,如果不同,则说明资源已经被修改,则返回200,如果相同则说明资源没有被修改,返回 304,客户端可以继续使用缓存。

这是流程图:

Okhttp的缓存策略就是根据上述流程图实现的,具体的实现类是CacheStrategy,CacheStrategy的构造函数里有两个参数

class CacheStrategy internal constructor(
  val networkRequest: Request?,
  val cacheResponse: Response?
)
复制代码

这两个参数参数的含义如下:

  • networkRequest:网络请求。
  • cacheResponse:缓存响应,基于DiskLruCache实现的文件缓存,key是请求中url的md5,value是文件中查询到的缓存

CacheStrategy 根据之前缓存结果与当前将要发生的 request 的Header 计算缓存策略。规则如下

networkRequest cacheResponse CacheStrategy
null null only-if-cached(表明不进行网络请求,且缓存不存在或者过期,一定会返回 503 错误)
null non-null 不进行网络请求,而且缓存可以使用,直接返回缓存,不用请求网络
non-null null 需要进行网络请求,而且缓存不存在或者过期,直接访问网络。
non-null not-null Header 中含有 ETag/Last-Modified 标识,需要在条件请求下使用,还是需要访问网络。

具体流程如下所示:

  • 读取候选缓存。
  • 根据候选缓存创建缓存策略。
  • 根据缓存策略,如果不进行网络请求,而且没有缓存数据时,报错返回错误码 504。
  • 根据缓存策略,如果不进行网络请求,缓存数据可用,则直接返回缓存数据。
  • 缓存无效,则继续执行网络请求。
  • 通过服务端校验后,缓存数据可以使用(返回 304),则直接返回缓存数据,并且更新缓存。
  • 读取网络结果,构造 response,对数据进行缓存。

整个流程就是这样,另外说一点,Okhttp的缓存是根据服务器header自动的完成的,整个流程也是根据RFC文档写死的,客户端不必要进行手动控制。

Okhttp的磁盘缓存机制是基于DiskLruCache做的,即最近最少使用算法来进行缓存的,使用okio作为输入输出。

OkHttp连接池

因为HTTP是基于TCP,TCP连接时需要经过三次握手,为了加快网络访问速度,我们可以Reuqst的header中将Connection设置为keepalive来复用连接。

Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),连接池有ConectionPool实现,对连接进行回收和管理。

连接池真正的实现类是RealConnectionPool。其中用来保存连接的是这样的对象:

private val connections = ConcurrentLinkedQueue<RealConnection>()
复制代码

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的。和其他并发集合一样。把一个元素放入到队列的线程的优先级高与对元素的访问和移除的线程。

添加连接到线程池:

fun put(connection: RealConnection) {
  connection.assertThreadHoldsLock()
  //添加链接到线程池
  connections.add(connection)
  //清除闲置的连接
  cleanupQueue.schedule(cleanupTask)
}
复制代码

清除闲置连接最主要 就是当闲置连接大于最大保持存活时间,清除出连接池。所以每当有新连接添加到线程池的时候,都会清除闲置的连接。并且用ConcurrentLinkedQueue线程安全的数据队列来保存连接。

Application Interceptors与Network Interceptors

很多小伙伴搞不清这两者的区别,从上面的图中可以看到Application Interceptors处于第一位,而Network Interceptors处于倒数第二位。这就是它们的本质区别,调用的时机不同,也就会出现以下现象。

Application Interceptors是用于在请求发送前网络响应后的拦截器

Network Interceptors在这一层拦截器中可以获取到最终发送请求的 request ,其中也包括重定向的数据,也可以获取到真正发生网络请求回来的 response 响应,从而修改对应的请求或者响应数据。

Application Interceptors只关心发出去的请求和最终响应的结果,其他的一概不关心。

这里放一个okhttp官方的区别最为明了

Application interceptors

  • Don't need to worry about intermediate responses like redirects and retries. 不需要去关心中发生的重定向和重试操作。因为它处于第一个拦截器,会获取到最终的响应 response 。
  • Are always invoked once, even if the HTTP response is served from the cache. 只会被调用一次,即使这个响应是从缓存中获取的。
  • Observe the application's original intent. Unconcerned with -OkHttp-injected headers like If-None-Match. 只关注最原始的请求,不去关系请求的资源是否发生了改变,我只关注最后的 response 结果而已。
  • Permitted to short-circuit and not call Chain.proceed(). 因为是第一个被执行的拦截器,因此它有权决定了是否要调用其他拦截,也就是 Chain.proceed() 方法是否要被执行。
  • Permitted to retry and make multiple calls to Chain.proceed() 因为是第一个被执行的拦截器,因此它有可以多次调用 Chain.proceed() 方法,其实也就是相当与重新请求的作用了。

Network Interceptors

  • Able to operate on intermediate responses like redirects and retries. 因为 NetworkInterceptor 是排在第 6 个拦截器中,因此可以操作经过 RetryAndFollowup 进行失败重试或者重定向之后得到的resposne。
  • Not invoked for cached responses that short-circuit the network. 对于从缓存获取的 response 则不会去触发 NetworkInterceptor 。因为响应直接从 CacheInterceptor 返回了。
  • Observe the data just as it will be transmitted over the network. 观察将在网络上传输的数据
  • Access to the Connection that carries the request. 访问携带请求的连接
免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

信息安全

485

相关文章推荐

未登录头像

暂无评论