elasticsearch節(jié)點的transport請求發(fā)送處理分析
transport請求的發(fā)送和處理過程
前一篇分析對nettytransport的啟動及連接,本篇主要分析transport請求的發(fā)送和處理過程。
cluster中各個節(jié)點之間需要相互發(fā)送很多信息,如master檢測其它節(jié)點是否存在,node節(jié)點定期檢測master節(jié)點是否存儲,cluster狀態(tài)的發(fā)布及搜索數(shù)據(jù)請求等等。為了保證信息傳輸,elasticsearch定義了一個19字節(jié)長度的信息頭HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'開頭,接著是4字節(jié)int信息長度,然后是8字節(jié)long型信息id,接著是一個字節(jié)的status,最后是4字節(jié)int型version。
所有的節(jié)點間的信息都是以這19個字節(jié)開始。同時elasticsearch對于節(jié)點間的所有action都定義 了名字,如對master的周期檢測action,internal:discovery/zen/fd/master_ping,每個action對應(yīng)著相應(yīng)的messagehandler。接下來會進行詳分析。
request的發(fā)送過程
代碼在nettytransport中如下所示:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
//參數(shù)說明:node發(fā)送的目的節(jié)點,requestId請求id,action action名稱,request請求,options包括以下幾種操作 RECOVERY,BULK,REG,STATE,PING;
Channel targetChannel = nodeChannel(node, options);//獲取對應(yīng)節(jié)點的channel,channel在連接節(jié)點時初始化完成(請參考上一篇)
if (compress) {
options.withCompress(true);
}
byte status = 0;
//設(shè)置status 包括以下幾種STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2;
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始寫出流
boolean addedReleaseListener = false;
try {
bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置
StreamOutput stream = bStream;
// only compress if asked, and, the request is not bytes, since then only
// the header part is compressed, and the "body" can't be extracted as compressed
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
status = TransportStatus.setCompress(status);
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
}
stream = new HandlesStreamOutput(stream);
// we pick the smallest of the 2, to support both backward and forward compatibility
// note, this is the only place we need to do this, since from here on, we use the serialized version
// as the version to use also when the node receiving this request will send the response with
Version version = Version.smallest(this.version, node.version());
stream.setVersion(version);
stream.writeString(transportServiceAdapter.action(action, version));
ReleasableBytesReference bytes;
ChannelBuffer buffer;
// it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output
// that create paged channel buffers, but its tricky to know when to do it (where this option is
// more explicit).
if (request instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) request;
assert node.version().equals(bRequest.version());
bRequest.writeThin(stream);
stream.close();
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
bytes = bStream.bytes();
buffer = bytes.toChannelBuffer();
}
NettyHeader.writeHeader(buffer, requestId, status, version);//寫信息頭
ChannelFuture future = targetChannel.write(buffer);//寫buffer同時獲取future,發(fā)送信息發(fā)生在這里
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
future.addListener(listener);//添加listener
addedReleaseListener = true;
transportServiceAdapter.onRequestSent(node, requestId, action, request, options);
} finally {
if (!addedReleaseListener) {
Releasables.close(bStream.bytes());
}
}
}以上就是request的發(fā)送過程,獲取目標node的channel封裝請求寫入信息頭,然后發(fā)送并使用listener監(jiān)聽,這里transportRequest是一個抽象類,它繼承了TransportMessage同時實現(xiàn)了streamable接口。cluster中對它的實現(xiàn)非常多,各個功能都有相應(yīng)的request,這里就不一一列舉,后面的代碼分析中會時常涉及。
request的接受過程
request發(fā)送只是transport的一部分功能,有發(fā)送就要有接收,這樣transport的功能才完整。接下來就是對接收過程的分析。上一篇中簡單介紹過netty的使用,message的處理是通過MessageHandler處理,因此nettyTransport的信息處理邏輯都在MessageChannelHandler的messageReceived()方法中,代碼如下所示:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Transports.assertTransportThread();
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {//非buffer之間返回
ctx.sendUpstream(e);
return;
}
//解析message頭
ChannelBuffer buffer = (ChannelBuffer) m;
int size = buffer.getInt(buffer.readerIndex() - 4);
transportServiceAdapter.received(size + 6);
// we have additional bytes to read, outside of the header
boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
int markedReaderIndex = buffer.readerIndex();
int expectedIndexReader = markedReaderIndex + size;
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumlation buffer, which is cleaned each time
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
//讀取信息頭中的幾個重要元數(shù)據(jù)
long requestId = buffer.readLong();
byte status = buffer.readByte();
Version version = Version.fromId(buffer.readInt());
StreamInput wrappedStream;
…………
if (TransportStatus.isRequest(status)) {//處理請求
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
} else {
logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
}
buffer.readerIndex(expectedIndexReader);
}
} else {//處理響應(yīng)
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
// ignore if its null, the adapter logs it
if (handler != null) {
if (TransportStatus.isError(status)) {
handlerResponseError(wrappedStream, handler);
} else {
handleResponse(ctx.getChannel(), wrappedStream, handler);
}
} else {
// if its null, skip those bytes
buffer.readerIndex(markedReaderIndex + size);
}
…………
wrappedStream.close();
}以上就是信息處理邏輯,這個方法基礎(chǔ)自netty的SimpleChannelUpstreamHandler類。作為MessageHandler會在client和server啟動時加入到handler鏈中,在信息到達后netty會自動調(diào)用handler鏈依次處理。這是netty的內(nèi)容,就不詳細說明,請參考netty文檔。
request和response是如何被處理
request的處理
代碼如下所示:
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();//讀出action的名字
transportServiceAdapter.onRequestReceived(requestId, action);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//獲取處理該信息的handler
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);//使用該handler處理信息。
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}幾個關(guān)鍵部分在代碼中進行了標注。這里仍舊不能看到請求是如何處理的。因為cluster中的請求各種各樣,如ping,discovery,index等等,因此不可能使用同一種處理方式。因此request最終又被提交給handler處理。每個功能請求都實現(xiàn)了自己的handler,當請求被提交給handler時會做對應(yīng)的處理。這里再說一下transportServiceAdapter,消息的處理都是通過它適配轉(zhuǎn)發(fā)完成。request的完整處理流程是:messageReceived()方法收到信息判斷是request會將其轉(zhuǎn)發(fā)到transportServiceAdapter的handler方法,handler方法查找對應(yīng)的requesthandler,使用將信息轉(zhuǎn)發(fā)給該handler進行處理。這里就不舉例說明,在后面的discover分析中我們會看到發(fā)現(xiàn),ping等請求的處理過程。
response的處理過程
response通過handleResponse方法進行處理,代碼如下:
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
try {
response.readFrom(buffer);
} catch (Throwable e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
return;
}
try {
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.handleResponse(response);//轉(zhuǎn)發(fā)給對應(yīng)的handler
} else {
threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response));
}
} catch (Throwable e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
}response的處理過程跟request很類似。每個request都會對應(yīng)一個handler和一個response的處理handler,會在時候的時候注冊到transportService中。請求到達時根據(jù)action名稱獲取到handler處理request,根據(jù)requestId獲取對應(yīng)的response handler進行響應(yīng)。
最后總結(jié)
nettyTransport的信息處理過程:信息通過request方法發(fā)送到目標節(jié)點,目標節(jié)點的messagehandler會受到該信息,確定是request還是response,將他們分別轉(zhuǎn)發(fā)給transportServiceAdapter,TransportServiceAdapter會查詢到對應(yīng)的handler,信息最終會被轉(zhuǎn)發(fā)給對應(yīng)的handler處理并反饋。
對于nettyTransport信息發(fā)送的分析就到這里,在下一篇的cluster discovery分析中,我們會看到信息發(fā)送及處理的具體過程,希望大家以后多多支持腳本之家!
相關(guān)文章
java并發(fā)容器ConcurrentHashMap深入分析
這篇文章主要為大家介紹了java并發(fā)容器ConcurrentHashMap使用示例及深入分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05
Java?Bluetooth?藍牙通訊?BlueCove?掃描附近的藍牙設(shè)備(測試代碼)
BlueCove是一個開源的藍牙協(xié)議棧實現(xiàn),旨在為Java開發(fā)者提供一個全面的、易于使用的API,從而在應(yīng)用程序中實現(xiàn)藍牙功能,該項目支持多種操作系統(tǒng),這篇文章主要介紹了Java?Bluetooth?藍牙通訊?BlueCove?掃描附近的藍牙設(shè)備,需要的朋友可以參考下2025-01-01
JAVA后臺轉(zhuǎn)換成樹結(jié)構(gòu)數(shù)據(jù)返回給前端的實現(xiàn)方法
這篇文章主要介紹了JAVA后臺轉(zhuǎn)換成樹結(jié)構(gòu)數(shù)據(jù)返回給前端的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-03-03
Spring Security將用戶數(shù)據(jù)存儲到數(shù)據(jù)庫的方法
這篇文章主要介紹了Spring Security將用戶數(shù)據(jù)存儲到數(shù)據(jù)庫的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09

