Java中RabbitMQ隊(duì)列實(shí)現(xiàn)RPC詳解
RabbitMQ實(shí)現(xiàn)RPC
如果我們需要在遠(yuǎn)程計(jì)算機(jī)上運(yùn)行一個(gè)函數(shù)并等待結(jié)果,這種模式通常被稱為遠(yuǎn)程過(guò)程調(diào)用或RPC。
在本教程中,我們將使用RabbitMQ構(gòu)建一個(gè)RPC系統(tǒng):
- 一個(gè)客戶端和一個(gè)RPC服務(wù)器。
- 我們將創(chuàng)建一個(gè)返回斐波那契數(shù)字的模擬RPC服務(wù)。
整個(gè)過(guò)程示意圖如下:

客戶端將請(qǐng)求發(fā)送至rpc_queue(我們定義的消息隊(duì)列),然后等待響應(yīng);服務(wù)端獲取請(qǐng)求,并處理請(qǐng)求,然后將請(qǐng)求結(jié)果返回給隊(duì)列,客戶端得知請(qǐng)求被響應(yīng)后獲取結(jié)果。
在結(jié)果被響應(yīng)之前,客戶端是被阻塞的,主線程會(huì)等待RPC響應(yīng)
如果每個(gè)RPC請(qǐng)求都創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是非常低效,我們創(chuàng)建一個(gè)單一的客戶端回調(diào)隊(duì)列。
這引發(fā)了一個(gè)新的問(wèn)題,在該隊(duì)列中收到回復(fù)時(shí),不清楚回復(fù)屬于哪個(gè)請(qǐng)求。這就需要用到 correlationId屬性。
我們?yōu)闆](méi)有請(qǐng)求設(shè)置唯一的correlationId值。
然后,當(dāng)我們?cè)诨卣{(diào)隊(duì)列中收到一條消息時(shí),我們將獲取這個(gè)值,將響應(yīng)與請(qǐng)求的進(jìn)行correlationId匹配。
如果我們一致就是我們需要的結(jié)果,否則就不是。
客戶端代RPCClient
代碼如下:
package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
//建立一個(gè)連接和一個(gè)通道,并為回調(diào)聲明一個(gè)唯一的'回調(diào)'隊(duì)列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
//定義一個(gè)臨時(shí)變量的接受隊(duì)列名
replyQueueName = channel.queueDeclare().getQueue();
}
//發(fā)送RPC請(qǐng)求
public String call(String message) throws IOException, InterruptedException {
//生成一個(gè)唯一的字符串作為回調(diào)隊(duì)列的編號(hào)
String corrId = UUID.randomUUID().toString();
//發(fā)送請(qǐng)求消息,消息使用了兩個(gè)屬性:replyto和correlationId
//服務(wù)端根據(jù)replyto返回結(jié)果,客戶端根據(jù)correlationId判斷響應(yīng)是不是給自己的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
//發(fā)布一個(gè)消息,requestQueueName路由規(guī)則
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
//由于我們的消費(fèi)者交易處理是在單獨(dú)的線程中進(jìn)行的,因此我們需要在響應(yīng)到達(dá)之前暫停主線程。
//這里我們創(chuàng)建的 容量為1的阻塞隊(duì)列ArrayBlockingQueue,因?yàn)槲覀冎恍枰却粋€(gè)響應(yīng)。
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// String basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//檢查它的correlationId是否是我們所要找的那個(gè)
if (properties.getCorrelationId().equals(corrId)) {
//如果是,則響應(yīng)BlockingQueue
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (IOException _ignore) {
}
}
}
}
}上面的代碼中用到了阻塞隊(duì)列ArrayBlockingQueue
服務(wù)端代RPCServer
代碼如下:
package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
//具體處理方法
private static int fib(int n) {
if (n == 0)
return 0;
if (n == 1)
return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
//建立連接、通道,并聲明隊(duì)列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
// 返回處理結(jié)果隊(duì)列
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 確認(rèn)消息,已經(jīng)收到后面參數(shù) multiple:是否批量.true:將一次性確認(rèn)所有小于envelope.getDeliveryTag()的消息。
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC
// server owner thread
synchronized (this) {
this.notify();
}
}
}
};
//取消自動(dòng)確認(rèn)
boolean autoAck = false ;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {
}
}
}
}測(cè)試時(shí)先運(yùn)行服務(wù)端,再運(yùn)行客戶端 為了方便觀察結(jié)果,最好將客戶端和服務(wù)端在不同workspace實(shí)現(xiàn)
客戶端結(jié)果

服務(wù)端結(jié)果

到此這篇關(guān)于Java中RabbitMQ隊(duì)列實(shí)現(xiàn)RPC詳解的文章就介紹到這了,更多相關(guān)RabbitMQ實(shí)現(xiàn)RPC內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用CXF和Jersey框架來(lái)進(jìn)行Java的WebService編程
這篇文章主要介紹了使用CXF和Jersey框架來(lái)進(jìn)行Java的WebService編程,Web service是一個(gè)平臺(tái)獨(dú)立的低耦合的自包含的基于可編程的web的應(yīng)用程序,需要的朋友可以參考下2015-12-12
JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景
這篇文章主要給大家介紹了關(guān)于JAVA基礎(chǔ)之注解與反射的使用方法和場(chǎng)景的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
java多線程累加計(jì)數(shù)的實(shí)現(xiàn)方法
在多線程協(xié)作任務(wù)中,如何計(jì)算也是很重的,這篇文章主要介紹了java多線程累加計(jì)數(shù)的實(shí)現(xiàn)方法,感興趣的朋友可以了解一下2021-05-05
使用@CachePut?更新數(shù)據(jù)庫(kù)和更新緩存
這篇文章主要介紹了使用@CachePut?更新數(shù)據(jù)庫(kù)和更新緩存方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
spring,mybatis事務(wù)管理配置與@Transactional注解使用詳解
這篇文章主要介紹了spring,mybatis事務(wù)管理配置與@Transactional注解使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
如何用120行Java代碼寫一個(gè)自己的區(qū)塊鏈
這篇文章就是幫助你使用 Java 語(yǔ)言來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的區(qū)塊鏈,用不到 120 行代碼來(lái)揭示區(qū)塊鏈的原理,感興趣的就一起來(lái)了解一下2019-06-06
Spring七大事務(wù)傳遞機(jī)制深入分析實(shí)現(xiàn)原理
實(shí)際項(xiàng)目開(kāi)發(fā)中,如果涉及到多張表操作時(shí),為了保證業(yè)務(wù)數(shù)據(jù)的一致性,大家一般都會(huì)采用事務(wù)機(jī)制,好多小伙伴可能只是簡(jiǎn)單了解一下,遇到事務(wù)失效的情況,便會(huì)無(wú)從下手,下面這篇文章主要給大家介紹了關(guān)于Spring事務(wù)傳遞機(jī)制的相關(guān)資料,需要的朋友可以參考下2023-03-03

