Java?OkHttp框架源碼超詳細解析
一、自己的理解的OkHttp
我理解的http的本質(zhì)就是基于socket連接,把要傳輸?shù)臄?shù)據(jù)按照http協(xié)議的格式去封裝后,傳輸在網(wǎng)絡(luò)中,以此來實現(xiàn)的網(wǎng)絡(luò)通信。
而OkHttp協(xié)議就是幫助我們,把我們把要傳輸?shù)臄?shù)據(jù)請求,按照http協(xié)議的格式,傳輸在Socket上,當然還有很多優(yōu)化管理這些請求和連接的方法,例如:對于這些請求的管理:最多同時進行64個請求,同域名的最多同時進行5個請求。還有Socket連接池的管理。
二、OkHttp的使用方法
1.創(chuàng)建一個client,構(gòu)建一個request
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("https://www.baidu.com/")
.build();2.同步請求
Response response = client.newCall(request).execute();
3.異步請求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
//todo handle request failed
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
//todo handle Response
}
});三、基本對象介紹
1.OkHttpClient
一個請求的配置類,采用了建造者模式,方便用戶配置一些請求參數(shù),如配置callTimeout,cookie,interceptor等等。
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
constructor() : this(Builder())
class Builder constructor() {
//調(diào)度器
internal var dispatcher: Dispatcher = Dispatcher()
//連接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//整體流程攔截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//網(wǎng)絡(luò)流程攔截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//流程監(jiān)聽器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
//連接失敗時是否重連
internal var retryOnConnectionFailure = true
//服務(wù)器認證設(shè)置
internal var authenticator: Authenticator = Authenticator.NONE
//是否重定向
internal var followRedirects = true
//是否從HTTP重定向到HTTPS
internal var followSslRedirects = true
//cookie設(shè)置
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
//緩存設(shè)置
internal var cache: Cache? = null
//DNS設(shè)置
internal var dns: Dns = Dns.SYSTEM
//代理設(shè)置
internal var proxy: Proxy? = null
//代理選擇器設(shè)置
internal var proxySelector: ProxySelector? = null
//代理服務(wù)器認證設(shè)置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
//socket配置
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
//https socket配置
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
//協(xié)議
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//域名校驗
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
//請求超時
internal var callTimeout = 0
//連接超時
internal var connectTimeout = 10_000
//讀取超時
internal var readTimeout = 10_000
//寫入超時
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
···省略代碼···2.request
同樣是請求參數(shù)的配置類,也同樣采用了建造者模式,但相比于OkHttpClient,Request就十分簡單了,只有四個參數(shù),分別是請求URL、請求方法、請求頭、請求體。
class Request internal constructor(
@get:JvmName("url") val url: HttpUrl,
@get:JvmName("method") val method: String,
@get:JvmName("headers") val headers: Headers,
@get:JvmName("body") val body: RequestBody?,
internal val tags: Map<Class<*>, Any>
) {
open class Builder {
//請求的URL
internal var url: HttpUrl? = null
//請求方法,如:GET、POST..
internal var method: String
//請求頭
internal var headers: Headers.Builder
//請求體
internal var body: RequestBody? = null
···省略代碼···3.Call
請求調(diào)用接口,表示這個請求已經(jīng)準備好可以執(zhí)行,也可以取消,只能執(zhí)行一次。
interface Call : Cloneable {
/** 返回發(fā)起此調(diào)用的原始請求 */
fun request(): Request
/**
* 同步請求,立即執(zhí)行。
*
* 拋出兩種異常:
* 1. 請求失敗拋出IOException;
* 2. 如果在執(zhí)行過一回的前提下再次執(zhí)行拋出IllegalStateException;*/
@Throws(IOException::class)
fun execute(): Response
/**
* 異步請求,將請求安排在將來的某個時間點執(zhí)行。
* 如果在執(zhí)行過一回的前提下再次執(zhí)行拋出IllegalStateException */
fun enqueue(responseCallback: Callback)
/** 取消請求。已經(jīng)完成的請求不能被取消 */
fun cancel()
/** 是否已被執(zhí)行 */
fun isExecuted(): Boolean
/** 是否被取消 */
fun isCanceled(): Boolean
/** 一個完整Call請求流程的超時時間配置,默認選自[OkHttpClient.Builder.callTimeout] */
fun timeout(): Timeout
/** 克隆這個call,創(chuàng)建一個新的相同的Call */
public override fun clone(): Call
/** 利用工廠模式來讓 OkHttpClient 來創(chuàng)建 Call對象 */
fun interface Factory {
fun newCall(request: Request): Call
}
}4.RealCall
OkHttpClient.kt override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall是Call接口的具體實現(xiàn)類,是應(yīng)用端與網(wǎng)絡(luò)層的連接橋,展示應(yīng)用端原始的請求與連接數(shù)據(jù),以及網(wǎng)絡(luò)層返回的response及其它數(shù)據(jù)流。 通過使用方法也可知,創(chuàng)建RealCall對象后,就要調(diào)用同步或異步請求方法,所以它里面還包含同步請求 execute()與異步請求 enqueue()方法。(后面具體展開分析)
5.AsyncCall
異步請求調(diào)用,是RealCall的一個內(nèi)部類,就是一個Runnable,被dispatcher調(diào)度器中的線程池所執(zhí)行。
inner class AsyncCall(
//用戶傳入的響應(yīng)回調(diào)方法
private val responseCallback: Callback
) : Runnable {
//同一個域名的請求次數(shù),volatile + AtomicInteger 保證在多線程下及時可見性與原子性
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
···省略代碼···
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//調(diào)用線程池執(zhí)行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
//請求失敗,調(diào)用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
//請求失敗,調(diào)用調(diào)度器finish方法
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//請求成功,獲取到服務(wù)器返回的response
val response = getResponseWithInterceptorChain()
signalledCallback = true
//調(diào)用 Callback.onResponse() 方法,將 response 傳遞出去
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
//請求失敗,調(diào)用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
//請求出現(xiàn)異常,調(diào)用cancel方法來取消請求
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
//請求失敗,調(diào)用 Callback.onFailure() 方法
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
//請求結(jié)束,調(diào)用調(diào)度器finish方法
client.dispatcher.finished(this)
}
}
}
}6.Dispatcher
調(diào)度器,用來調(diào)度Call對象,同時包含線程池與異步請求隊列,用來存放與執(zhí)行AsyncCall對象。
class Dispatcher constructor() {
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
//創(chuàng)建一個緩存線程池,來處理請求調(diào)用,這個線程池的核心線程數(shù)是0,等待隊列的長度也是0,意味著
//線程池會直接創(chuàng)建新的線程去處理請求
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 已準備好的異步請求隊列 */
@get:Synchronized
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在運行的異步請求隊列, 包含取消但是還未finish的AsyncCall */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在運行的同步請求隊列, 包含取消但是還未finish的RealCall */
private val runningSyncCalls = ArrayDeque<RealCall>()
···省略代碼···
}四、流程分析
1.同步請求
client.newCall(request).execute();
newCall方法就是創(chuàng)建一個RealCall對象,然后執(zhí)行其execute()方法。
RealCall.kt
override fun execute(): Response {
//CAS判斷是否已經(jīng)被執(zhí)行了, 確保只能執(zhí)行一次,如果已經(jīng)執(zhí)行過,則拋出異常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//請求超時開始計時
timeout.enter()
//開啟請求監(jiān)聽
callStart()
try {
//調(diào)用調(diào)度器中的 executed() 方法,調(diào)度器只是將 call 加入到了runningSyncCalls隊列中
client.dispatcher.executed(this)
//調(diào)用getResponseWithInterceptorChain 方法拿到 response
return getResponseWithInterceptorChain()
} finally {
//執(zhí)行完畢,調(diào)度器將該 call 從 runningSyncCalls隊列中移除
client.dispatcher.finished(this)
}
}
Dispatcher.kt
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}調(diào)用調(diào)度器executed方法,就是將當前的RealCall對象加入到runningSyncCalls隊列中,然后調(diào)用getResponseWithInterceptorChain方法拿到response。
2.異步請求
RealCall.kt
override fun enqueue(responseCallback: Callback) {
//CAS判斷是否已經(jīng)被執(zhí)行了, 確保只能執(zhí)行一次,如果已經(jīng)執(zhí)行過,則拋出異常
check(executed.compareAndSet(false, true)) { "Already Executed" }
//開啟請求監(jiān)聽
callStart()
//新建一個AsyncCall對象,通過調(diào)度器enqueue方法加入到readyAsyncCalls隊列中
client.dispatcher.enqueue(AsyncCall(responseCallback))
}然后調(diào)用調(diào)度器的enqueue方法
Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
//加鎖,保證線程安全
synchronized(this) {
//將該請求調(diào)用加入到 readyAsyncCalls 隊列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
//通過域名來查找有沒有相同域名的請求,有則復(fù)用。
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//執(zhí)行請求
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
//判斷是否有請求正在執(zhí)行
val isRunning: Boolean
//加鎖,保證線程安全
synchronized(this) {
//遍歷 readyAsyncCalls 隊列
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//runningAsyncCalls 的數(shù)量不能大于最大并發(fā)請求數(shù) 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//同域名最大請求數(shù)5,同一個域名最多允許5條線程同時執(zhí)行請求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
//從 readyAsyncCalls 隊列中移除,并加入到 executableCalls 及 runningAsyncCalls 隊列中
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//通過運行隊列中的請求數(shù)量來判斷是否有請求正在執(zhí)行
isRunning = runningCallsCount() > 0
}
//遍歷可執(zhí)行隊列,調(diào)用線程池來執(zhí)行AsyncCall
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}調(diào)度器的enqueue方法就是將AsyncCall加入到readyAsyncCalls隊列中,然后調(diào)用promoteAndExecute方法來執(zhí)行請求,promoteAndExecute方法做的其實就是遍歷readyAsyncCalls隊列,然后將符合條件的請求用線程池執(zhí)行,也就是會執(zhí)行AsyncCall.run()方法。
AsyncCall 方法的具體代碼看上面的這邊就不在此展示了,簡單來說就是調(diào)用getResponseWithInterceptorChain方法拿到response,然后通過Callback.onResponse方法傳遞出去。反之,如果請求失敗,捕獲了異常,就通過Callback.onFailure將異常信息傳遞出去。 最終,請求結(jié)束,調(diào)用調(diào)度器finish方法。
Dispatcher.kt
/** 異步請求調(diào)用結(jié)束方法 */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** 同步請求調(diào)用結(jié)束方法 */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//將當前請求調(diào)用從 正在運行隊列 中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//繼續(xù)執(zhí)行剩余請求,將call從readyAsyncCalls中取出加入到runningAsyncCalls,然后執(zhí)行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
//如果執(zhí)行完了所有請求,處于閑置狀態(tài),調(diào)用閑置回調(diào)方法
idleCallback.run()
}
}請求結(jié)束,異步請求,把當前同域名的計數(shù)減一,然后后面和同步一樣,都是把請求從正在執(zhí)行的隊列中移除,然后繼續(xù)執(zhí)行剩余請求。
3.獲取Response
接著就是看看getResponseWithInterceptorChain方法是如何拿到response的。
internal fun getResponseWithInterceptorChain(): Response {
//攔截器列表
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
//構(gòu)建攔截器責(zé)任鏈
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
//如果call請求完成,那就意味著交互完成了,沒有更多的東西來交換了
var calledNoMoreExchanges = false
try {
//執(zhí)行攔截器責(zé)任鏈來獲取 response
val response = chain.proceed(originalRequest)
//如果被取消,關(guān)閉響應(yīng),拋出異常
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}簡單概括一下:這里采用了責(zé)任鏈設(shè)計模式,通過攔截器構(gòu)建了以RealInterceptorChain責(zé)任鏈,然后執(zhí)行proceed方法來得到response。
那么,這又涉及攔截器是什么?攔截器責(zé)任鏈又是什么?
五、Interceptor
只聲明了一個攔截器方法,在子類中具體實現(xiàn),還包含一個Chain接口,核心方法是proceed(request)處理請求來獲取response。
fun interface Interceptor {
/** 攔截方法 */
@Throws(IOException::class)
fun intercept(chain: Chain): Response
interface Chain {
/** 原始請求數(shù)據(jù) */
fun request(): Request
/** 核心方法,處理請求,獲取response */
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}六、RealInterceptorChain
攔截器鏈就是實現(xiàn)Interceptor.Chain接口,重點就是復(fù)寫的proceed方法。
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
···省略代碼···
private var calls: Int = 0
override fun call(): Call = call
override fun request(): Request = request
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
//index+1, 復(fù)制創(chuàng)建新的責(zé)任鏈,也就意味著調(diào)用責(zé)任鏈中的下一個處理者,也就是下一個攔截器
val next = copy(index = index + 1, request = request)
//取出當前攔截器
val interceptor = interceptors[index]
//執(zhí)行當前攔截器的攔截方法
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
}鏈式調(diào)用,最終會向下執(zhí)行攔截器列表中的每個攔截器,然后向上返回Response。
七、攔截器
各類攔截器的總結(jié),按順序:
client.interceptors:這是由開發(fā)者設(shè)置的,會在所有的攔截器處理之前進行最早的攔截處理,可用于添加一些公共參數(shù),如自定義header、自定義log等等。RetryAndFollowUpInterceptor:這里會對連接做一些初始化工作,以及請求失敗的重試工作,重定向的后續(xù)請求工作。跟他的名字一樣,就是做重試工作還有一些連接跟蹤工作。BridgeInterceptor:是客戶端與服務(wù)器之間的溝通橋梁,負責(zé)將用戶構(gòu)建的請求轉(zhuǎn)換為服務(wù)器需要的請求,以及將網(wǎng)絡(luò)請求返回回來的響應(yīng)轉(zhuǎn)換為用戶可用的響應(yīng)。CacheInterceptor:這里主要是緩存的相關(guān)處理,會根據(jù)用戶在OkHttpClient里定義的緩存配置,然后結(jié)合請求新建一個緩存策略,由它來判斷是使用網(wǎng)絡(luò)還是緩存來構(gòu)建response。ConnectInterceptor:這里主要就是負責(zé)建立連接,會建立TCP連接或者TLS連接。Client.networkInterceptors:這里也是開發(fā)者自己設(shè)置的,所以本質(zhì)上和第一個攔截器差不多,但是由于位置不同,所以用處也不同。CallServerInterceptor:這里就是進行網(wǎng)絡(luò)數(shù)據(jù)的請求和響應(yīng)了,也就是實際的網(wǎng)絡(luò)I/O操作,將請求頭與請求體發(fā)送給服務(wù)器,以及解析服務(wù)器返回的response。
接下來我們按順序,從上往下,對這些攔截器進行一一解讀。
1.client.interceptors
這是用戶自己定義的攔截器,稱為應(yīng)用攔截器,會保存在OkHttpClient的interceptors: List<Interceptor>列表中。 他是攔截器責(zé)任鏈中的第一個攔截器,也就是說會第一個執(zhí)行攔截方法,我們可以通過它來添加自定義Header信息,如:
class HeaderInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request().newBuilder()
.addHeader("device-android", "xxxxxxxxxxx")
.addHeader("country-code", "ZH")
.build();
return chain.proceed(request);
}
}
//然后在 OkHttpClient 中加入
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.cookieJar(new MyCookieJar())
.addInterceptor(new HeaderInterceptor())//添加自定義Header攔截器
.build();2.RetryAndFollowUpInterceptor
第二個攔截器,從它的名字也可知道,它負責(zé)請求失敗的重試工作與重定向的后續(xù)請求工作,同時它會對連接做一些初始化工作。
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//這里會新建一個ExchangeFinder,ConnectInterceptor會使用到
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
var response: Response
var closeActiveExchange = true
try {
if (call.isCanceled()) {
throw IOException("Canceled")
}
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
//嘗試通過路由連接失敗。該請求將不會被發(fā)送。
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
//嘗試與服務(wù)器通信失敗。該請求可能已發(fā)送。
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
// Attach the prior response if it exists. Such responses never have a body.
//嘗試關(guān)聯(lián)上一個response,注意:body是為null
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
//會根據(jù) responseCode 來判斷,構(gòu)建一個新的request并返回來重試或者重定向
val followUp = followUpRequest(response, exchange)
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
//如果請求體是一次性的,不需要再次重試
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
//最大重試次數(shù),不同的瀏覽器是不同的,比如:Chrome為21,Safari則是16
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
request = followUp
priorResponse = response
} finally {
call.exitNetworkInterceptorExchange(closeActiveExchange)
}
}
}
/** 判斷是否要進行重連,false->不嘗試重連;true->嘗試重連。*/
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
//客戶端禁止重試
if (!client.retryOnConnectionFailure) return false
//不能再次發(fā)送該請求體
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
//發(fā)生的異常是致命的,無法恢復(fù),如:ProtocolException
if (!isRecoverable(e, requestSendStarted)) return false
//沒有更多的路由來嘗試重連
if (!call.retryAfterFailure()) return false
// 對于失敗恢復(fù),使用帶有新連接的相同路由選擇器
return true
}
···省略代碼··· 3.BridgeInterceptor
從它的名字可以看出,他的定位是客戶端與服務(wù)器之間的溝通橋梁,負責(zé)將用戶構(gòu)建的請求轉(zhuǎn)換為服務(wù)器需要的請求,比如:添加Content-Type,添加Cookie,添加User-Agent等等。再將服務(wù)器返回的response做一些處理轉(zhuǎn)換為客戶端需要的response。比如:移除響應(yīng)頭中的Content-Encoding、Content-Length等等。
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//獲取原始請求數(shù)據(jù)
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
//重新構(gòu)建請求頭,請求體信息
val body = userRequest.body
val contentType = body.contentType()
requestBuilder.header("Content-Type", contentType.toString())
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.header("Host", userRequest.url.toHostHeader())
requestBuilder.header("Connection", "Keep-Alive")
···省略代碼···
//添加cookie
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//添加user-agent
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
//重新構(gòu)建一個Request,然后執(zhí)行下一個攔截器來處理該請求
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
//創(chuàng)建一個新的responseBuilder,目的是將原始請求數(shù)據(jù)構(gòu)建到response中
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
//修改response header信息,移除Content-Encoding,Content-Length信息
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
//修改response body信息
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
···省略代碼···4.CacheInterceptor
用戶可以通過OkHttpClient.cache來配置緩存,緩存攔截器通過CacheStrategy來判斷是使用網(wǎng)絡(luò)還是緩存來構(gòu)建response。
class CacheInterceptor(internal val cache: Cache?) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
//通過request從OkHttpClient.cache中獲取緩存
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//創(chuàng)建一個緩存策略,用來確定怎么使用緩存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//為空表示不使用網(wǎng)絡(luò),反之,則表示使用網(wǎng)絡(luò)
val networkRequest = strategy.networkRequest
//為空表示不使用緩存,反之,則表示使用緩存
val cacheResponse = strategy.cacheResponse
//追蹤網(wǎng)絡(luò)與緩存的使用情況
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
//有緩存但不適用,關(guān)閉它
if (cacheCandidate != null && cacheResponse == null) {
cacheCandidate.body?.closeQuietly()
}
//如果網(wǎng)絡(luò)被禁止,但是緩存又是空的,構(gòu)建一個code為504的response,并返回
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
//如果我們禁用了網(wǎng)絡(luò)不使用網(wǎng)絡(luò),且有緩存,直接根據(jù)緩存內(nèi)容構(gòu)建并返回response
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
//為緩存添加監(jiān)聽
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
var networkResponse: Response? = null
try {
//責(zé)任鏈往下處理,從服務(wù)器返回response 賦值給 networkResponse
networkResponse = chain.proceed(networkRequest)
} finally {
//捕獲I/O或其他異常,請求失敗,networkResponse為空,且有緩存的時候,不暴露緩存內(nèi)容。
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
//如果有緩存
if (cacheResponse != null) {
//且網(wǎng)絡(luò)返回response code為304的時候,使用緩存內(nèi)容新構(gòu)建一個Response返回。
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
//否則關(guān)閉緩存響應(yīng)體
cacheResponse.body?.closeQuietly()
}
}
//構(gòu)建網(wǎng)絡(luò)請求的response
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
//如果cache不為null,即用戶在OkHttpClient中配置了緩存,則將上一步新構(gòu)建的網(wǎng)絡(luò)請求response存到cache中
if (cache != null) {
//根據(jù)response的code,header以及CacheControl.noStore來判斷是否可以緩存
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// 將該response存入緩存
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
listener.cacheMiss(call)
}
}
}
//根據(jù)請求方法來判斷緩存是否有效,只對Get請求進行緩存,其它方法的請求則移除
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
//緩存無效,將該請求緩存從client緩存配置中移除
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
···省略代碼··· 5.ConnectInterceptor
負責(zé)實現(xiàn)與服務(wù)器真正建立起連接,
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//初始化一個exchange對象
val exchange = realChain.call.initExchange(chain)
//根據(jù)這個exchange對象來復(fù)制創(chuàng)建一個新的連接責(zé)任鏈
val connectedChain = realChain.copy(exchange = exchange)
//執(zhí)行該連接責(zé)任鏈
return connectedChain.proceed(realChain.request)
}
}一掃下來,代碼十分簡單,攔截方法里就只有三步。
- 初始化一個
exchange對象。 - 然后根據(jù)這個
exchange對象來復(fù)制創(chuàng)建一個新的連接責(zé)任鏈。 - 執(zhí)行該連接責(zé)任鏈。
那這個exchange對象又是什么呢?
RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {
...省略代碼...
//這里的exchangeFinder就是在RetryAndFollowUpInterceptor中創(chuàng)建的
val exchangeFinder = this.exchangeFinder!!
//返回一個ExchangeCodec(是個編碼器,為request編碼以及為response解碼)
val codec = exchangeFinder.find(client, chain)
//根據(jù)exchangeFinder與codec新構(gòu)建一個Exchange對象,并返回
val result = Exchange(this, eventListener, exchangeFinder, codec)
...省略代碼...
return result
}具體看看ExchangeFinder.find()這一步,
ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
//查找合格可用的連接,返回一個 RealConnection 對象
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//根據(jù)連接,創(chuàng)建并返回一個請求響應(yīng)編碼器:Http1ExchangeCodec 或者 Http2ExchangeCodec,分別對應(yīng)Http1協(xié)議與Http2協(xié)議
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}繼續(xù)往下看findHealthyConnection方法
ExchangeFinder.kt
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
//重點:查找連接
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
//檢查該連接是否合格可用,合格則直接返回該連接
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate
}
//如果該連接不合格,標記為不可用,從連接池中移除
candidate.noNewExchanges()
...省略代碼...
}
}所以核心方法就是findConnection,我們繼續(xù)深入看看該方法:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
//第一次,嘗試重連 call 中的 connection,不需要去重新獲取連接
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
//如果 call 中的 connection 還沒有釋放,就重用它。
if (call.connection != null) {
check(toClose == null)
return callConnection
}
//如果 call 中的 connection 已經(jīng)被釋放,關(guān)閉Socket.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
//需要一個新的連接,所以重置一些狀態(tài)
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
//第二次,嘗試從連接池中獲取一個連接,不帶路由,不帶多路復(fù)用
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
//連接池中是空的,準備下次嘗試連接的路由
val routes: List<Route>?
val route: Route
...省略代碼...
//第三次,再次嘗試從連接池中獲取一個連接,帶路由,不帶多路復(fù)用
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
//第四次,手動創(chuàng)建一個新連接
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
//第五次,再次嘗試從連接池中獲取一個連接,帶路由,帶多路復(fù)用。
//這一步主要是為了校驗一下,比如已經(jīng)有了一條連接了,就可以直接復(fù)用,而不用使用手動創(chuàng)建的新連接。
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
//將手動創(chuàng)建的新連接放入連接池
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}在代碼中可以看出,一共做了5次嘗試去得到連接:
- 第一次,嘗試重連 call 中的 connection,不需要去重新獲取連接。
- 第二次,嘗試從連接池中獲取一個連接,不帶路由,不帶多路復(fù)用。
- 第三次,再次嘗試從連接池中獲取一個連接,帶路由,不帶多路復(fù)用。
- 第四次,手動創(chuàng)建一個新連接。
- 第五次,再次嘗試從連接池中獲取一個連接,帶路由,帶多路復(fù)用。
這一步就是為了建立連接。
6.client.networkInterceptors
該攔截器稱為網(wǎng)絡(luò)攔截器,與client.interceptors一樣也是由用戶自己定義的,同樣是以列表的形式存在OkHttpClient中。
那這兩個攔截器有什么不同呢?
其實他兩的不同都是由于他們所處的位置不同所導(dǎo)致的,應(yīng)用攔截器處于第一個位置,所以無論如何它都會被執(zhí)行,而且只會執(zhí)行一次。而網(wǎng)絡(luò)攔截器處于倒數(shù)第二的位置,它不一定會被執(zhí)行,而且可能會被執(zhí)行多次,比如:在RetryAndFollowUpInterceptor失敗或者CacheInterceptor直接返回緩存的情況下,我們的網(wǎng)絡(luò)攔截器是不會被執(zhí)行的。
7.CallServerInterceptor
到了這里,客戶端與服務(wù)器已經(jīng)建立好了連接,接著就是將請求頭與請求體發(fā)送給服務(wù)器,以及解析服務(wù)器返回的response了。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
try {
//寫入請求頭
exchange.writeRequestHeaders(request)
//如果不是GET請求,并且請求體不為空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
//當請求頭為"Expect: 100-continue"時,在發(fā)送請求體之前需要等待服務(wù)器返回"HTTP/1.1 100 Continue" 的response,如果沒有等到該response,就不發(fā)送請求體。
//POST請求,先發(fā)送請求頭,在獲取到100繼續(xù)狀態(tài)后繼續(xù)發(fā)送請求體
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
//刷新請求,即發(fā)送請求頭
exchange.flushRequest()
//解析響應(yīng)頭
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
//寫入請求體
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
//如果請求體是雙公體,就先發(fā)送請求頭,稍后在發(fā)送請求體
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
//寫入請求體
requestBody.writeTo(bufferedRequestBody)
} else {
//如果獲取到了"Expect: 100-continue"響應(yīng),寫入請求體
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
···省略代碼···
//請求結(jié)束,發(fā)送請求體
exchange.finishRequest()
···省略代碼···
try {
if (responseBuilder == null) {
//讀取響應(yīng)頭
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
···省略代碼···
//構(gòu)建一個response
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
···省略代碼···
return response
···省略代碼···簡單概括一下:寫入發(fā)送請求頭,然后根據(jù)條件是否寫入發(fā)送請求體,請求結(jié)束。解析服務(wù)器返回的請求頭,然后構(gòu)建一個新的response,并返回。 這里CallServerInterceptor是攔截器責(zé)任鏈中最后一個攔截器了,所以他不會再調(diào)用chain.proceed()方法往下執(zhí)行,而是將這個構(gòu)建的response往上傳遞給責(zé)任鏈中的每個攔截器。
總結(jié)一下流程:

到此這篇關(guān)于Java OkHttp框架源碼超詳細解析的文章就介紹到這了,更多相關(guān)Java OkHttp框架內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中volatile關(guān)鍵字的作用是什么舉例詳解
這篇文章主要介紹了Java中volatile關(guān)鍵字的作用是什么的相關(guān)資料,volatile關(guān)鍵字在Java中用于修飾變量,提供可見性和禁止指令重排的特性,但不保證原子性,它通過內(nèi)存屏障實現(xiàn)這些特性,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2025-04-04
Spring Boot實戰(zhàn)之靜態(tài)資源處理
這篇文章主要介紹了Spring Boot實戰(zhàn)之靜態(tài)資源處理,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-01-01
Elasticsearch查詢之Term?Query示例解析
這篇文章主要為大家介紹了Elasticsearch查詢之Term?Query示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04
Java?數(shù)據(jù)結(jié)構(gòu)進階二叉樹題集下
二叉樹可以簡單理解為對于一個節(jié)點來說,最多擁有一個上級節(jié)點,同時最多具備左右兩個下級節(jié)點的數(shù)據(jù)結(jié)構(gòu)。本文將帶你通過實際題目來熟練掌握2022-04-04
Java中List與數(shù)組相互轉(zhuǎn)換實例分析
這篇文章主要介紹了Java中List與數(shù)組相互轉(zhuǎn)換的方法,實例分析了Java中List與數(shù)組相互轉(zhuǎn)換中容易出現(xiàn)的問題與相關(guān)的解決方法,具有一定參考借鑒價值,需要的朋友可以參考下2015-05-05
SpringMVC?HttpMessageConverter報文信息轉(zhuǎn)換器
這篇文章主要為大家介紹了SpringMVC?HttpMessageConverter報文信息轉(zhuǎn)換器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
Spring集成webSocket頁面訪問404問題的解決方法
這篇文章主要介紹了Spring集成webSocket頁面訪問404問題的解決方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-12-12
關(guān)于Java集合框架Collection接口詳解
這篇文章主要介紹了關(guān)于Java集合框架Collection接口詳解,Collection接口是Java集合框架中的基礎(chǔ)接口,定義了一些基本的集合操作,包括添加元素、刪除元素、遍歷集合等,需要的朋友可以參考下2023-05-05

