Netty + ZooKeeper 實現(xiàn)簡單的服務(wù)注冊與發(fā)現(xiàn)
一. 背景
最近的一個項目:我們的系統(tǒng)接收到上游系統(tǒng)的派單任務(wù)后,會推送到指定的門店的相關(guān)設(shè)備,并進(jìn)行相應(yīng)的業(yè)務(wù)處理。
二. Netty 的使用
在接收到派單任務(wù)之后,通過 Netty 推送到指定門店相關(guān)的設(shè)備。在我們的系統(tǒng)中 Netty 實現(xiàn)了消息推送、長連接以及心跳機(jī)制。

2.1 Netty Server 端:
每個 Netty 服務(wù)端通過 ConcurrentHashMap 保存了客戶端的 clientId 以及它連接的 SocketChannel。
服務(wù)器端向客戶端發(fā)送消息時,只要獲取 clientId 對應(yīng)的 SocketChannel,往 SocketChannel 里寫入相應(yīng)的 message 即可。
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new PushServerHandler());
}
});
ChannelFuture future = bootstrap.bind(host,port).sync();
if (future.isSuccess()) {
logger.info("server start...");
}
2.2 Netty Client 端:
客戶端用于接收服務(wù)端的消息,隨即進(jìn)行業(yè)務(wù)處理。客戶端還有心跳機(jī)制,它通過 IdleEvent 事件定時向服務(wù)端放送 Ping 消息以此來檢測 SocketChannel 是否中斷。
public PushClientBootstrap(String host, int port) throws InterruptedException {
this.host = host;
this.port = port;
start(host,port);
}
private void start(String host, int port) throws InterruptedException {
bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.group(workGroup)
.remoteAddress(host, port)
.handler(new ChannelInitializer(){
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(20, 10, 0)); // IdleStateHandler 用于檢測心跳
p.addLast(new MessageDecoder());
p.addLast(new MessageEncoder());
p.addLast(new PushClientHandler());
}
});
doConnect(port, host);
}
/**
* 建立連接,并且可以實現(xiàn)自動重連.
* @param port port.
* @param host host.
* @throws InterruptedException InterruptedException.
*/
private void doConnect(int port, String host) throws InterruptedException {
if (socketChannel != null && socketChannel.isActive()) {
return;
}
final int portConnect = port;
final String hostConnect = host;
ChannelFuture future = bootstrap.connect(host, port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
socketChannel = (SocketChannel) futureListener.channel();
logger.info("Connect to server successfully!");
} else {
logger.info("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
doConnect(portConnect, hostConnect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 10, TimeUnit.SECONDS);
}
}
}).sync();
}
三. 借助 ZooKeeper 實現(xiàn)簡單的服務(wù)注冊與發(fā)現(xiàn)
3.1 服務(wù)注冊
服務(wù)注冊本質(zhì)上是為了解耦服務(wù)提供者和服務(wù)消費者。服務(wù)注冊是一個高可用強(qiáng)一致性的服務(wù)發(fā)現(xiàn)存儲倉庫,主要用來存儲服務(wù)的api和地址對應(yīng)關(guān)系。為了高可用,服務(wù)注冊中心一般為一個集群,并且能夠保證分布式一致性。目前常用的有 ZooKeeper、Etcd 等等。
在我們項目中采用了 ZooKeeper 實現(xiàn)服務(wù)注冊。
public class ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
createNode(zk, data);
}
}
}
/**
* 連接 zookeeper 服務(wù)器
* @return
*/
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/**
* 創(chuàng)建節(jié)點
* @param zk
* @param data
*/
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constants.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
logger.error("", e);
}
}
}
有了服務(wù)注冊,在 Netty 服務(wù)端啟動之后,將 Netty 服務(wù)端的 ip 和 port 注冊到 ZooKeeper。
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new PushServerHandler());
}
});
ChannelFuture future = bootstrap.bind(host,port).sync();
if (future.isSuccess()) {
logger.info("server start...");
}
if (serviceRegistry != null) {
serviceRegistry.register(host + ":" + port);
}
3.2 服務(wù)發(fā)現(xiàn)
這里我們采用的是客戶端的服務(wù)發(fā)現(xiàn),即服務(wù)發(fā)現(xiàn)機(jī)制由客戶端實現(xiàn)。
客戶端在和服務(wù)端建立連接之前,通過查詢注冊中心的方式來獲取服務(wù)端的地址。如果存在有多個 Netty 服務(wù)端的話,可以做服務(wù)的負(fù)載均衡。在我們的項目中只采用了簡單的隨機(jī)法進(jìn)行負(fù)載。
public class ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscovery.class);
private CountDownLatch latch = new CountDownLatch(1);
private volatile List<String> serviceAddressList = new ArrayList<>();
private String registryAddress; // 注冊中心的地址
public ServiceDiscovery(String registryAddress) {
this.registryAddress = registryAddress;
ZooKeeper zk = connectServer();
if (zk != null) {
watchNode(zk);
}
}
/**
* 通過服務(wù)發(fā)現(xiàn),獲取服務(wù)提供方的地址
* @return
*/
public String discover() {
String data = null;
int size = serviceAddressList.size();
if (size > 0) {
if (size == 1) { //只有一個服務(wù)提供方
data = serviceAddressList.get(0);
logger.info("unique service address : {}", data);
} else { //使用隨機(jī)分配法。簡單的負(fù)載均衡法
data = serviceAddressList.get(ThreadLocalRandom.current().nextInt(size));
logger.info("choose an address : {}", data);
}
}
return data;
}
/**
* 連接 zookeeper
* @return
*/
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constants.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException | InterruptedException e) {
logger.error("", e);
}
return zk;
}
/**
* 獲取服務(wù)地址列表
* @param zk
*/
private void watchNode(final ZooKeeper zk) {
try {
//獲取子節(jié)點列表
List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
//發(fā)生子節(jié)點變化時再次調(diào)用此方法更新服務(wù)地址
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constants.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
logger.debug("node data: {}", dataList);
this.serviceAddressList = dataList;
} catch (KeeperException | InterruptedException e) {
logger.error("", e);
}
}
}
Netty 客戶端啟動之后,通過服務(wù)發(fā)現(xiàn)獲取 Netty 服務(wù)端的 ip 和 port。
/**
* 支持通過服務(wù)發(fā)現(xiàn)來獲取 Socket 服務(wù)端的 host、port
* @param discoveryAddress
* @throws InterruptedException
*/
public PushClientBootstrap(String discoveryAddress) throws InterruptedException {
serviceDiscovery = new ServiceDiscovery(discoveryAddress);
serverAddress = serviceDiscovery.discover();
if (serverAddress!=null) {
String[] array = serverAddress.split(":");
if (array!=null && array.length==2) {
String host = array[0];
int port = Integer.parseInt(array[1]);
start(host,port);
}
}
}
四. 總結(jié)
服務(wù)注冊和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊中心,如何實現(xiàn)一個簡單的服務(wù)注冊和發(fā)現(xiàn)。其實,注冊中心的選擇有很多,例如 Etcd、Eureka 等等。選擇符合我們業(yè)務(wù)需求的才是最重要的。
以上所述是小編給大家介紹的Netty + ZooKeeper 實現(xiàn)簡單的服務(wù)注冊與發(fā)現(xiàn),希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!
相關(guān)文章
springboot定時任務(wù)SchedulingConfigurer異步多線程實現(xiàn)方式
這篇文章主要介紹了springboot定時任務(wù)SchedulingConfigurer異步多線程實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-04-04
Java實現(xiàn)BP神經(jīng)網(wǎng)絡(luò)MNIST手寫數(shù)字識別的示例詳解
這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)BP神經(jīng)網(wǎng)絡(luò)MNIST手寫數(shù)字識別的相關(guān)方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下2023-01-01
Java工程使用ffmpeg進(jìn)行音視頻格式轉(zhuǎn)換的實現(xiàn)
FFmpeg是一套可以用來記錄、轉(zhuǎn)換數(shù)字音頻、視頻,并能將其轉(zhuǎn)化為流的開源計算機(jī)程序,本文主要介紹了Java工程使用ffmpeg進(jìn)行音視頻格式轉(zhuǎn)換的實現(xiàn)2024-02-02
使用Java進(jìn)行Json數(shù)據(jù)的解析(對象數(shù)組的相互嵌套)
下面小編就為大家?guī)硪黄褂肑ava進(jìn)行Json數(shù)據(jù)的解析(對象數(shù)組的相互嵌套)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-08-08
詳談java編碼互轉(zhuǎn)(application/x-www-form-urlencoded)
下面小編就為大家?guī)硪黄斦刯ava編碼互轉(zhuǎn)(application/x-www-form-urlencoded)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-07-07
Spring boot按日切分spring boot的nohup.out日志文件的方法
過大的日志文件維護(hù)起來存在諸多問題,所以最好是能夠按日或按大小切分日志文件,下面小編給大家?guī)砹薙pring boot按日切分spring boot的nohup.out日志文件的方法,一起看看吧2018-08-08
淺談SpringBoot在使用測試的時候是否需要@RunWith
本文主要介紹了淺談SpringBoot在使用測試的時候是否需要@RunWith,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
java正則表達(dá)式匹配網(wǎng)頁所有網(wǎng)址和鏈接文字的示例
這篇文章主要介紹了java正則表達(dá)式匹配網(wǎng)頁所有網(wǎng)址和鏈接文字java正則表達(dá)式匹配,需要的朋友可以參考下2014-03-03

