Netty分布式ByteBuf使用的底層實(shí)現(xiàn)方式源碼解析
概述
熟悉Nio的小伙伴應(yīng)該對jdk底層byteBuffer不會陌生, 也就是字節(jié)緩沖區(qū), 主要用于對網(wǎng)絡(luò)底層io進(jìn)行讀寫, 當(dāng)channel中有數(shù)據(jù)時(shí), 將channel中的數(shù)據(jù)讀取到字節(jié)緩沖區(qū), 當(dāng)要往對方寫數(shù)據(jù)的時(shí)候, 將字節(jié)緩沖區(qū)的數(shù)據(jù)寫到channel中
但是jdk的byteBuffer是使用起來有諸多不便, 比如只有一個標(biāo)記位置的指針position, 在進(jìn)行讀寫操作時(shí)要頻繁的通過flip()方法進(jìn)行指針位置的移動, 極易出錯, 并且byteBuffer的內(nèi)存一旦分配則不能改變, 不支持動態(tài)擴(kuò)容, 當(dāng)讀寫的內(nèi)容大于緩沖區(qū)內(nèi)存時(shí), 則會發(fā)生索引越界異常
而Netty的ByteBuf對jdk的byteBuffer做了重新的定義, 同樣是字節(jié)緩沖區(qū)用于讀取網(wǎng)絡(luò)io中的數(shù)據(jù), 但是使用起來大大簡化, 并且支持了自動擴(kuò)容, 不用擔(dān)心讀寫數(shù)據(jù)大小超過初始分配的大小
byteBuf根據(jù)其分類的不同底層實(shí)現(xiàn)方式有所不同, 有直接基于jdk底層byteBuffer實(shí)現(xiàn)的, 也有基于字節(jié)數(shù)組的實(shí)現(xiàn)的, 對于byteBuf的分類, 在后面的小節(jié)將會講到
byteBuf中維護(hù)了兩個指針, 一是讀指針, 二是寫指針, 兩個指針相互獨(dú)立, 在讀操作的時(shí)候, 只會移動讀指針, 通過指針位置記錄讀取的字節(jié)數(shù)
同樣在寫操作時(shí), 也只會移動寫指針, 通過寫指針的位置記錄寫的字節(jié)數(shù)
在每次讀寫操作的過程中都會對指針的位置進(jìn)行校驗(yàn), 讀指針的位置不能超過寫指針, 否則會拋出異常
同樣, 寫指針不能超過緩沖區(qū)分配的內(nèi)存, 則將對緩沖區(qū)做擴(kuò)容操作
具體指針操作, 入下圖所示:

AbstractByteBuf屬性和構(gòu)造方法
在講AbstractByteBuf之前, 我們首先先了解一下ByteBuf這個類, 這是所有ByteBuf的最頂層抽象, 里面定義了大量對ByteBuf操作的抽象方法供子類實(shí)現(xiàn)
AbstractByteBuf同樣也緩沖區(qū)的抽象類, 定義了byteBuf的骨架操作, 比如參數(shù)校驗(yàn), 自動擴(kuò)容, 以及一些讀寫操作的指針移動, 但具體的實(shí)現(xiàn), 不同的bytebuf實(shí)現(xiàn)起來是不同的, 這種情況則交給其子類實(shí)現(xiàn)
AbstractByteBuf繼承了這個類, 并實(shí)現(xiàn)了其大部分的方法
首先看這個類的屬性和構(gòu)造方法
//讀指針
int readerIndex;
//寫指針
int writerIndex;
//保存讀指針
private int markedReaderIndex;
//保存寫指針
private int markedWriterIndex;
//最大分配容量
private int maxCapacity;
protected AbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}
this.maxCapacity = maxCapacity;
}我們可以看到在屬性中定義了讀寫指針的成員標(biāo)量, 和讀寫指針位置的保存
在構(gòu)造方法中可以傳入可分配的最大內(nèi)存, 然后賦值到成員變量中
我們看幾個最簡單的方法
@Override
public int maxCapacity() {
return maxCapacity;
}
@Override
public int readerIndex() {
return readerIndex;
}
@Override
public int writerIndex() {
return writerIndex;
}獲取最大內(nèi)存, 獲取讀寫指針這些方法, 對所有的bytebuf都是通用的, 所以可以定義在AbstractByteBuf中
我們以一個writeBytes方法為例, 讓同學(xué)們熟悉AbstractByteBuf中哪些部分自己實(shí)現(xiàn), 哪些部分則交給了子類實(shí)現(xiàn):
@Override
public ByteBuf writeBytes(ByteBuf src) {
writeBytes(src, src.readableBytes());
return this;
}這個方法是將源的ByteBuf(參數(shù))中的字節(jié)寫入到自身ByteBuf中
首先這里調(diào)用了自身的writeBytes方法, 并傳入?yún)?shù)ByteBuf本身, 以及Bytebuf的可讀字節(jié)數(shù), 我們跟到readbleBytes()方法中, 其實(shí)就是調(diào)用了自身的方法:
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}我們看到, 這里可讀字節(jié)數(shù)就是返回了寫指針到讀指針之間的長度
我們再繼續(xù)跟到writeBytes(src, src.readableBytes())中:
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);
return this;
}這里同樣調(diào)用了自身的方法首先會對參數(shù)進(jìn)行驗(yàn)證, 就是寫入自身的長度不能超過源ByteBuf的可讀字節(jié)數(shù)
這里又調(diào)用了一個wirte方法, 參數(shù)傳入源Bytebuf, 其可讀字節(jié)數(shù), 寫入的長度, 這里寫入的長度我們知道就是源ByteBuf的可讀字節(jié)數(shù)
我們再跟到writeBytes(src, src.readerIndex(), length);
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}我們重點(diǎn)關(guān)注第二個校驗(yàn)方法ensureWritable(length)
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
ensureWritable0(minWritableBytes);
return this;
}然后我們再跟到ensureWritable0(minWritableBytes)方法中:
private void ensureWritable0(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
//自動擴(kuò)容
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
capacity(newCapacity);
}開始做了兩個參數(shù)校驗(yàn), 第一個表示當(dāng)前ByteBuf寫入的長度如果要小于可寫字節(jié)數(shù), 則返回
第二個可以換種方式去看minWritableBytes+ writerIndex> maxCapacity 也就是需要寫入的長度+寫指針必須要小于最大分配的內(nèi)存, 否則報(bào)錯, 注意這里最大分配內(nèi)存不帶表當(dāng)前內(nèi)存, 而是byteBuf所能分配的最大內(nèi)存
如果需要寫入的長度超過了可寫字節(jié)數(shù), 并且需要寫入的長度+寫指針不超過最大內(nèi)存, 則就開始了ByteBuf非常經(jīng)典也非常重要的操作, 也就是自動擴(kuò)容
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
其中alloc()返回的是當(dāng)前bytebuf返回的緩沖區(qū)分配器對象, 我們之后的小節(jié)會講到, 這里調(diào)用了其calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity)方法為其擴(kuò)容, 其中傳入的參數(shù)writerIndex + minWritableBytes代表所需要的容量, maxCapacity為最大容量
我們跟到擴(kuò)容的方法里面去
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
//合法性校驗(yàn)
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
//閾值為4mb
final int threshold = 1048576 * 4;
//最小需要擴(kuò)容內(nèi)存(總內(nèi)存) == 閾值
if (minNewCapacity == threshold) {
//返回閾值
return threshold;
}
//最小擴(kuò)容內(nèi)存>閾值
if (minNewCapacity > threshold) {
//newCapacity為需要擴(kuò)容內(nèi)存
int newCapacity = minNewCapacity / threshold * threshold;
//目標(biāo)容量+閾值>最大容量
if (newCapacity > maxCapacity - threshold) {
//將最大容量作為新容量
newCapacity = maxCapacity;
} else {
//否則, 目標(biāo)容量+閾值
newCapacity += threshold;
}
return newCapacity;
}
//如果小于閾值
int newCapacity = 64;
//目標(biāo)容量<需要擴(kuò)容的容量
while (newCapacity < minNewCapacity) {
//倍增
newCapacity <<= 1;
}
//目標(biāo)容量和最大容量返回一個最小的
return Math.min(newCapacity, maxCapacity);
}擴(kuò)容相關(guān)的邏輯注釋也寫的非常清楚, 如果小于閾值(4mb), 采用倍增的方式, 如果大于閾值(4mb), 采用平移4mb的方式
我們回到writeBytes(ByteBuf src, int srcIndex, int length):
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}再往下看setBytes(writerIndex, src, srcIndex, length), 這里的參數(shù)的意思是從當(dāng)前byteBuf的writerIndex節(jié)點(diǎn)開始寫入, 將源緩沖區(qū)src的讀指針位置, 寫lenght個字節(jié), 這里的方法中AbstractByteBuf類并沒有提供實(shí)現(xiàn), 因?yàn)椴煌愋偷腂tyeBuf實(shí)現(xiàn)的方式是不一樣的, 所以這里交給了子類去實(shí)現(xiàn)
最后將寫指針后移length個字節(jié)
最后我們回到writeBytes(ByteBuf src, int length)方法中:
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);
return this;
}當(dāng)writeBytes(src, src.readerIndex(), length)寫完之后, 通過src.readerIndex(src.readerIndex() + length)將源緩沖區(qū)的讀指針后移lenght個字節(jié)
以上對AbstractByteBuf的簡單介紹和其中寫操作的方法的簡單剖析
以上就是Netty分布式ByteBuf使用的底層實(shí)現(xiàn)方式源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Netty分布式ByteBuf使用底層實(shí)現(xiàn)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于SpringMVC實(shí)現(xiàn)網(wǎng)頁登錄攔截
SpringMVC的處理器攔截器類似于Servlet開發(fā)中的過濾器Filter,用于對處理器進(jìn)行預(yù)處理和后處理。因此,本文將為大家介紹如何通過SpringMVC實(shí)現(xiàn)網(wǎng)頁登錄攔截功能,需要的小伙伴可以了解一下2021-12-12
詳解Java后端優(yōu)雅驗(yàn)證參數(shù)合法性
這篇文章主要介紹了詳解Java后端優(yōu)雅驗(yàn)證參數(shù)合法性,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
關(guān)于springboot的接口返回值統(tǒng)一標(biāo)準(zhǔn)格式
這篇文章主要介紹了關(guān)于springboot的接口返回值統(tǒng)一標(biāo)準(zhǔn)格式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-05-05
利用Springboot+Caffeine實(shí)現(xiàn)本地緩存實(shí)例代碼
Caffeine是一個基于Java8開發(fā)的提供了近乎最佳命中率的高性能的緩存庫,下面這篇文章主要給大家介紹了關(guān)于利用Springboot+Caffeine實(shí)現(xiàn)本地緩存的相關(guān)資料,需要的朋友可以參考下2023-01-01
SpringBoot使用CommandLineRunner接口完成資源初始化方式
這篇文章主要介紹了SpringBoot使用CommandLineRunner接口完成資源初始化方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
java.lang.IllegalStateException異常原因和解決辦法
這篇文章主要給大家介紹了關(guān)于java.lang.IllegalStateException異常原因和解決辦法,IllegalStateException是Java標(biāo)準(zhǔn)庫中的一個異常類,通常表示在不合適或無效的情況下執(zhí)行了某個方法或操作,需要的朋友可以參考下2023-07-07

