利用Java搭建個(gè)簡(jiǎn)單的Netty通信實(shí)例教程
前言
看過(guò)dubbo源碼的同學(xué)應(yīng)該都清楚,使用dubbo協(xié)議的底層通信是使用的netty進(jìn)行交互,而最近看了dubbo的Netty部分后,自己寫(xiě)了個(gè)簡(jiǎn)單的Netty通信例子。
準(zhǔn)備
工程截圖

模塊詳解
- rpc-common
rpc-common作為各個(gè)模塊都需使用的模塊,工程中出現(xiàn)的是一些通信時(shí)請(qǐng)求的參數(shù)以及返回的參數(shù),還有一些序列化的工具。
- rpc-client
rpc-client中目前只是單單的一個(gè)NettyClient啟動(dòng)類。
- rpc-server
rpc-client中目前也只是單單的一個(gè)NettyServer服務(wù)啟動(dòng)類。
需要的依賴
目前所有的依賴項(xiàng)都出現(xiàn)在 rpc-common 下的 pom.xml中。
<dependencies>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- Protostuff -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.9</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.9</version>
</dependency>
<!-- Objenesis -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
</dependencies>
實(shí)現(xiàn)
首先我們?cè)赾ommon中先定義本次的Request和Response的基類對(duì)象。
public class Request {
private String requestId;
private Object parameter;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getParameter() {
return parameter;
}
public void setParameter(Object parameter) {
this.parameter = parameter;
}
}
public class Response {
private String requestId;
private Object result;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
使用fastJson進(jìn)行本次序列化
Netty對(duì)象的序列化轉(zhuǎn)換很好懂, ByteToMessageDecoder 和 MessageToByteEncoder 分別只要繼承它們,重寫(xiě)方法后,獲取到Object和Byte,各自轉(zhuǎn)換就OK。
不過(guò)如果是有要用到生產(chǎn)上的同學(xué),建議不要使用 fastJson,因?yàn)樗穆┒囱a(bǔ)丁真的是太多了,可以使用google的 protostuff。
public class RpcDecoder extends ByteToMessageDecoder {
// 目標(biāo)對(duì)象類型進(jìn)行解碼
private Class<?> target;
public RpcDecoder(Class target) {
this.target = target;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) { // 不夠長(zhǎng)度丟棄
return;
}
in.markReaderIndex(); // 標(biāo)記一下當(dāng)前的readIndex的位置
int dataLength = in.readInt(); // 讀取傳送過(guò)來(lái)的消息的長(zhǎng)度。ByteBuf 的readInt()方法會(huì)讓他的readIndex增加4
if (in.readableBytes() < dataLength) { // 讀到的消息體長(zhǎng)度如果小于我們傳送過(guò)來(lái)的消息長(zhǎng)度,則resetReaderIndex. 這個(gè)配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = JSON.parseObject(data, target); // 將byte數(shù)據(jù)轉(zhuǎn)化為我們需要的對(duì)象
out.add(obj);
}
}
public class RpcEncoder extends MessageToByteEncoder {
//目標(biāo)對(duì)象類型進(jìn)行編碼
private Class<?> target;
public RpcEncoder(Class target) {
this.target = target;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (target.isInstance(msg)) {
byte[] data = JSON.toJSONBytes(msg); // 使用fastJson將對(duì)象轉(zhuǎn)換為byte
out.writeInt(data.length); // 先將消息長(zhǎng)度寫(xiě)入,也就是消息頭
out.writeBytes(data); // 消息體中包含我們要發(fā)送的數(shù)據(jù)
}
}
}
NetyServer
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out.println("Client Data:" + JSON.toJSONString(request));
Response response = new Response();
response.setRequestId(request.getRequestId());
response.setResult("Hello Client !");
// client接收到信息后主動(dòng)關(guān)閉掉連接
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private String ip;
private int port;
public NettyServer(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void server() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new RpcDecoder(Request.class))
.addLast(new RpcEncoder(Response.class))
.addLast(new NettyServerHandler());
}
});
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 開(kāi)啟長(zhǎng)連接
ChannelFuture future = serverBootstrap.bind(ip, port).sync();
// if (future.isSuccess()) {
//
// new Register().register("/yanzhenyidai/com.yanzhenyidai.server", ip + ":" + port);
// }
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new NettyServer("127.0.0.1", 20000).server();
}
}
關(guān)鍵名詞:
- EventLoopGroup
- workerGroup
- bossGroup
Server端的EventLoopGroup分為兩個(gè),一般workerGroup作為處理請(qǐng)求,bossGroup作為接收請(qǐng)求。
- ChannelOption
- SO_BACKLOG
- SO_SNDBUF
- SO_RCVBUF
- SO_KEEPALIVE
以上四個(gè)常量作為TCP連接中的屬性。
- ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
NettyServerHandler中出現(xiàn)的 ChannelFutureListener.CLOSE ,作為Server端主動(dòng)關(guān)閉與Client端的通信,如果沒(méi)有主動(dòng)Close,那么NettyClient將會(huì)一直處于阻塞狀態(tài),得不到NettyServer的返回信息。
NettyClient
public class NettyClient extends SimpleChannelInboundHandler<Response> {
private final String ip;
private final int port;
private Response response;
public NettyClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response;
}
public Response client(Request request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 創(chuàng)建并初始化 Netty 客戶端 Bootstrap 對(duì)象
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcDecoder(Response.class));
pipeline.addLast(new RpcEncoder(Request.class));
pipeline.addLast(NettyClient.this);
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":");
// 連接 RPC 服務(wù)器
ChannelFuture future = bootstrap.connect(ip, port).sync();
// 寫(xiě)入 RPC 請(qǐng)求數(shù)據(jù)并關(guān)閉連接
Channel channel = future.channel();
channel.writeAndFlush(request).sync();
channel.closeFuture().sync();
return response;
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString());
request.setParameter("Hello Server !");
System.out.println(JSON.toJSONString(new NettyClient("127.0.0.1", 30000).client(request)));
}
}
測(cè)試
如果以上所有內(nèi)容都準(zhǔn)備就緒,那么就可以進(jìn)行調(diào)試了。
啟動(dòng)順序,先啟動(dòng)NettyServer,再啟動(dòng)NettyClient。
總結(jié)
記得剛出來(lái)工作時(shí),有工作很多年的同事問(wèn)我了不了解Netty,當(dāng)時(shí)工作太短,直說(shuō)聽(tīng)過(guò)Putty,現(xiàn)在回想起來(lái)真的挺丟人的,哈哈。😋
Netty作為通信框架,如果你了解TCP,而且項(xiàng)目中有類似傳輸信息的需求,又不想集成HTTP或者Socket,那么Netty真的挺實(shí)用的。
參考資料:
本項(xiàng)目Github地址:Netty-RPC
到此這篇關(guān)于利用Java搭建個(gè)簡(jiǎn)單的Netty通信的文章就介紹到這了,更多相關(guān)Java搭建Netty通信內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
在mybatis執(zhí)行SQL語(yǔ)句之前進(jìn)行攔擊處理實(shí)例
本篇文章主要介紹了在mybatis執(zhí)行SQL語(yǔ)句之前進(jìn)行攔擊處理實(shí)例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-04-04
java.lang.NoSuchMethodException: com.sun.proxy.$Proxy58.list
這篇文章主要介紹了java.lang.NoSuchMethodException: com.sun.proxy.$Proxy58.list錯(cuò)誤解決辦法的相關(guān)資料,需要的朋友可以參考下2016-12-12
java發(fā)送post請(qǐng)求使用multipart/form-data格式文件數(shù)據(jù)到接口代碼示例
這篇文章主要介紹了java發(fā)送post請(qǐng)求使用multipart/form-data格式文件數(shù)據(jù)到接口的相關(guān)資料,文中指定了數(shù)據(jù)編碼格式為UTF-8,并強(qiáng)調(diào)了所需依賴工具類,需要的朋友可以參考下2024-12-12
Java實(shí)現(xiàn)ZooKeeper的zNode監(jiān)控
這篇文章主要介紹了Java實(shí)現(xiàn)ZooKeeper的zNode監(jiān)控問(wèn)題,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值 ,需要的朋友可以參考下2019-08-08
Java關(guān)鍵字final、static使用總結(jié)
final方法不能被子類的方法覆蓋,但可以被繼承。用static修飾的代碼塊表示靜態(tài)代碼塊,當(dāng)Java虛擬機(jī)(JVM)加載類時(shí),就會(huì)執(zhí)行該代碼塊,下面通過(guò)本文給大家分享Java關(guān)鍵字final、static使用總結(jié),感興趣的朋友一起看看吧2017-07-07
SpringBoot整合Flyway的方法(數(shù)據(jù)庫(kù)版本遷移工具)
這篇文章主要介紹了SpringBoot整合Flyway的方法(數(shù)據(jù)庫(kù)版本遷移工具),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06
Spring監(jiān)聽(tīng)器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解
這篇文章主要介紹了Spring監(jiān)聽(tīng)器及定時(shí)任務(wù)實(shí)現(xiàn)方法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07

