Node.js高級編程之UDP可靠性分析
前言
UDP 協(xié)議是我們平時較少接觸到的知識,不同于 TCP,它是“不可靠”的,今天我們就來實戰(zhàn)一下看下它到底怎么個不可靠法?
不可靠的 UDP
實驗前,我們先介紹一下需要用到的工具(Mac 環(huán)境,其他環(huán)境請自行搜索相關(guān)工具):
- Network Link Conditioner:模擬丟包場景,可以去蘋果開發(fā)者網(wǎng)站上下載
- Wireshark:抓包分析工具
- 云主機(jī):因為實現(xiàn)發(fā)現(xiàn) Network Link Conditioner 對本地回環(huán)地址不起作用,如果有更好的方法求大佬指出
然后我們準(zhǔn)備兩段代碼,一段作為 UDP Server,一段作為 UDP Client,Client 會向 Server 發(fā)送 26 個英文大寫字母,Server 會將他們存到文件:
// udp-server.js
const udp = require('dgram')
const server = udp.createSocket('udp4')
const fs = require('fs')
server.on('listening', function () {
var address = server.address()
var port = address.port
console.log('Server is listening at port ' + port)
})
server.on('message', function (msg, info) {
console.log(
`Data received from ${info.address}:${info.port}: ${msg.toString()}`
)
fs.appendFileSync('./out', msg.toString())
})
server.on('error', function (error) {
console.log('Error: ' + error)
server.close()
})
server.bind(7788)
// udp-client.js
const udp = require('dgram')
const client = udp.createSocket('udp4')
for (let i = 0; i < 26; i++) {
const char = String.fromCharCode(0x41 + i)
client.send(Buffer.from(char), 7788, '********', function (error) {
if (error) {
console.log(error)
}
})
}
接著我們按照下面步驟開始實驗:
- 通過 Network Link Conditioner 把丟包率設(shè)置為 50%:

- 設(shè)置好 Wireshark 的抓包參數(shù):

- 在云主機(jī)上啟動 Server,在本地啟動 Client。
接著,我們來看一下實驗結(jié)果:
- 首先,我們可以看到服務(wù)端接收到的字母少了很多,只有 14 個:

- 服務(wù)端接收到的字母順序是亂序的,比如 U 跑到了 T 的前面:

為了進(jìn)行對比,我們可以換成 TCP 試試,代碼如下,結(jié)果就不貼了:
// tcp-server.js
const net = require('net')
const server = net.createServer()
const fs = require('fs')
server.on('connection', function (conn) {
conn.on('data', (msg) => {
console.log(
`Data received from ${conn.address().address}:${
conn.address().port
}: ${msg.toString()}`
)
fs.appendFileSync('./out', msg.toString())
})
})
server.listen(8899, () => {
console.log('server listening to %j', server.address().port)
})
// tcp-client.js
var net = require('net')
var client = new net.Socket()
client.connect(8899, '********', function () {
for (let i = 0; i < 26; i++) {
const char = String.fromCharCode(0x41 + i)
client.write(char)
}
})
接下我們試試基于 UDP 來實現(xiàn)一個可靠的傳輸協(xié)議,主要解決上面的丟包和亂序問題。
基于 UDP 的簡單可靠傳輸協(xié)議
首先,需要設(shè)計一下我們的協(xié)議格式。為了簡單起見,我們只在原來 UDP 的數(shù)據(jù)部分分別新增 4 個字節(jié)的 SEQ 和 ACK:
+-------------------------------+ | 64 個字節(jié)的 UDP 首部 | +-------------------------------+ | SEQ(4 個字節(jié)) | ACK(4 個字節(jié)) | +-------------------------------+ | Data | +-------------------------------+
其中 SEQ 表示當(dāng)前包的序號,ACK 表示回復(fù)序號。
接下來看看,我們?nèi)绾谓鉀Q前面的兩個問題。
亂序問題
接收方需要維護(hù)一個變量 expectedSeq 的變量表示期待接收到的包序號。為了簡單起見,我們制定如下規(guī)則:如果當(dāng)前接收到的包序號等于 expectedSeq,則把包交給應(yīng)用層處理,并發(fā)送 ACK 給發(fā)送方;否則我們都直接丟棄。當(dāng)然更好的做法是維護(hù)一個接收窗口,這樣可以批量的提交數(shù)據(jù)給應(yīng)用層,也可以用來緩存大于 expectedSeq 的包。
假設(shè)現(xiàn)在發(fā)送方發(fā)送了 1 2 3 兩個包,但是到達(dá)接收方的順序是 3 2 1,按照我們的規(guī)則接收方會丟棄 3 和 2,接收 1。好家伙,順序倒是不亂了,但是包沒了。
所以還得把丟包問題也解決了才行。
丟包問題
發(fā)送方維護(hù)一個發(fā)送窗口用來存儲已發(fā)送但是還未被確認(rèn)的包:
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+
發(fā)送方每發(fā)送一個包的同時還需要將包放入發(fā)送窗口,并設(shè)置一個定時器用來重發(fā)這個包。當(dāng)發(fā)送方接收到來自接收方的 ACK 時,需要取消掉對應(yīng)包的定時器,并將發(fā)送窗口中小于 ACK 的包都刪除。
+---+---+---+---+ | 1 | 2 | 3 | 4 | +---+---+---+---+ // ACK = 4,刪除 1 2 3,并取消掉他們的定時器 +---+ | 4 | +---+
完整代碼及使用 Demo 見文末,現(xiàn)在可以正常按順序輸出 26 個字母了,但是離“可靠”協(xié)議還差得遠(yuǎn)。比如第一次輸出完 26 個字母后,我們再次啟動客戶端時發(fā)現(xiàn)就沒有任何輸出了。原因在于此時接收端的 expectedSeq 已經(jīng)是 20 多了,但是新啟動的 client 發(fā)送的 SEQ 還是從 1 開始的,結(jié)果就是接收端一直丟棄接收到的包,發(fā)送端一直重試。
要解決這個問題,可以參考 TCP 在傳輸兩端建立“連接”的概念,在開始發(fā)送前通過“三次握手”建立連接,也就是確定起始 SEQ,初始化窗口等工作,結(jié)束前通過“四次揮手”斷開連接,即清理窗口定時器等工作。這個就留到以后再說吧。
代碼
// packet.js
class Packet {
constructor({seq, ack, data = ''}) {
this.seq = seq // 序列號
this.ack = ack // 確認(rèn)號
this.data = data // 數(shù)據(jù)
}
// 將 Packet 轉(zhuǎn)換成 Buffer,以便通過網(wǎng)絡(luò)傳輸
toBuffer() {
const seqBuffer = Buffer.alloc(4)
seqBuffer.writeUInt32BE(this.seq)
const ackBuffer = Buffer.alloc(4)
ackBuffer.writeUInt32BE(this.ack)
const dataBuffer = Buffer.from(this.data)
return Buffer.concat([seqBuffer, ackBuffer, dataBuffer])
}
// 從 Buffer 中解析出 Packet
static fromBuffer(buffer) {
const seq = buffer.readUInt32BE()
const ack = buffer.readUInt32BE(4)
const data = buffer.slice(8)
return new Packet({seq, ack, data})
}
}
module.exports = Packet
// reliableUDP.js
const dgram = require('dgram')
const Packet = require('./packet')
class ReliableUDP {
constructor() {
this.socket = dgram.createSocket('udp4')
this.socket.on('message', this.handleMessage.bind(this))
this.sendWindow = [] // 發(fā)送窗口,用于存放待確認(rèn)的數(shù)據(jù)包
this.receiveWindow = [] // 接收窗口,用于存放已接收的數(shù)據(jù)包
this.expectedSeq = 1 // 期望接收的數(shù)據(jù)包序列號
this.nextSeq = 1 // 下一個要發(fā)送的數(shù)據(jù)包序列號
this.timeout = 100 // 超時時間,單位為毫秒
this.timeoutIds = {} // 用于存放定時器 ID
}
listen(port, address, fn) {
this.socket.bind(port, address, fn)
}
// 發(fā)送數(shù)據(jù)包
sendPacket(packet, address, port) {
const buffer = packet.toBuffer()
this.socket.send(buffer, port, address, (err) => {
if (err) {
console.error(err)
}
})
if (packet.ack) return
if (!this.sendWindow.includes((p) => p.seq === packet.seq))
this.sendWindow.push(packet)
// 設(shè)置超時定時器
const timeoutId = setTimeout(() => {
this.handleTimeout(packet.seq, address, port)
}, this.timeout)
this.timeoutIds[packet.seq] = timeoutId
}
// 處理接收到的數(shù)據(jù)包
handleMessage(msg, rinfo) {
const {address, port} = rinfo
const packet = Packet.fromBuffer(msg)
// 收到的是應(yīng)答的包
if (packet.ack) {
const ackNum = packet.ack - 1
// 處理發(fā)送窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包
while (this.sendWindow.length > 0 && this.sendWindow[0].seq <= ackNum) {
this.sendWindow.shift()
}
// 清除超時定時器
if (this.timeoutIds[ackNum]) {
clearTimeout(this.timeoutIds[ackNum])
delete this.timeoutIds[ackNum]
}
} else {
// 如果是重復(fù)的數(shù)據(jù)包,則忽略
if (packet.seq < this.expectedSeq) {
return
}
// 如果是期望接收的數(shù)據(jù)包
if (packet.seq === this.expectedSeq) {
this.receiveWindow.push(packet)
this.expectedSeq++
// 處理接收窗口中已經(jīng)確認(rèn)的數(shù)據(jù)包
while (
this.receiveWindow.length > 0 &&
this.receiveWindow[0].seq <= this.expectedSeq
) {
const packet = this.receiveWindow.shift()
this.onPacketReceived(packet.data)
}
const ackPacket = new Packet({
seq: this.nextSeq++,
ack: this.expectedSeq,
})
this.sendPacket(ackPacket, address, port)
} else {
// 如果是未來的數(shù)據(jù)包,暫不做處理,更好的做法是緩存起來
}
}
}
// 應(yīng)用層調(diào)用該方法發(fā)送數(shù)據(jù)
send(data, address, port) {
const packet = new Packet({
seq: this.nextSeq,
ack: null,
data,
})
this.sendPacket(packet, address, port)
this.nextSeq++
}
// 應(yīng)用層調(diào)用該方法注冊回調(diào)函數(shù),接收數(shù)據(jù)
onReceive(callback) {
this.onPacketReceived = callback
}
// 處理超時
handleTimeout(seq, address, port) {
// 重傳超時的數(shù)據(jù)包
const packet = this.sendWindow.find((p) => p.seq === seq)
if (packet) {
this.sendPacket(packet, address, port)
}
}
}
module.exports = ReliableUDP
// server.js
const ReliableUDP = require('./reliableUDP')
const server = new ReliableUDP()
server.listen(7788, 'localhost')
server.onReceive((data) => {
console.log(data.toString())
})
// client.js
const ReliableUDP = require('./reliableUDP')
const client = new ReliableUDP()
for (let i = 0; i < 26; i++) {
const char = String.fromCharCode(0x41 + i)
client.send(char, 'localhost', 7788)
}以上就是Node.js高級編程之UDP可靠性分析的詳細(xì)內(nèi)容,更多關(guān)于Node.js高級編程UDP的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
node版本太高導(dǎo)致項目跑不起來的解決辦法(windows)
換了臺電腦后,安裝node,一切完美,發(fā)現(xiàn)其中有一個uniapp的小程序項目跑不起來,感覺是node版本太高導(dǎo)致的,所以只能重新安裝低版本的node,本文給大家介紹了node版本太高的解決辦法,需要的朋友可以參考下2023-10-10
Node.js斷點(diǎn)續(xù)傳的實現(xiàn)
最近做了個項目,應(yīng)項目需求,需要傳圖片、Excel等,幾M的大小可以很快就上傳到服務(wù)器,但是大的就需要斷點(diǎn)上傳,本文就介紹一下,感興趣的可以了解一下2021-05-05

