Java API方式調(diào)用Kafka各種協(xié)議的方法
眾所周知,Kafka自己實(shí)現(xiàn)了一套二進(jìn)制協(xié)議(binary protocol)用于各種功能的實(shí)現(xiàn),比如發(fā)送消息,獲取消息,提交位移以及創(chuàng)建topic等。具體協(xié)議規(guī)范參見(jiàn):Kafka協(xié)議 這套協(xié)議的具體使用流程為:
1.客戶端創(chuàng)建對(duì)應(yīng)協(xié)議的請(qǐng)求
2.客戶端發(fā)送請(qǐng)求給對(duì)應(yīng)的broker
3.broker處理請(qǐng)求,并發(fā)送response給客戶端
雖然Kafka提供的大量的腳本工具用于各種功能的實(shí)現(xiàn),但很多時(shí)候我們還是希望可以把某些功能以編程的方式嵌入到另一個(gè)系統(tǒng)中。這時(shí)使用Java API的方式就顯得異常地靈活了。本文我將嘗試給出Java API底層框架的一個(gè)范例,同時(shí)也會(huì)針對(duì)“創(chuàng)建topic”和“查看位移”這兩個(gè)主要功能給出對(duì)應(yīng)的例子。 需要提前說(shuō)明的是,本文給出的范例并沒(méi)有考慮Kafka集群開(kāi)啟安全的情況。另外Kafka的KIP4應(yīng)該一直在優(yōu)化命令行工具以及各種管理操作,有興趣的讀者可以關(guān)注這個(gè)KIP。
本文中用到的API依賴于kafka-clients,所以如果你使用Maven構(gòu)建的話,請(qǐng)加上:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
如果是gradle,請(qǐng)加上:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'
底層框架
/**
* 發(fā)送請(qǐng)求主方法
* @param host 目標(biāo)broker的主機(jī)名
* @param port 目標(biāo)broker的端口
* @param request 請(qǐng)求對(duì)象
* @param apiKey 請(qǐng)求類型
* @return 序列化后的response
* @throws IOException
*/
public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
Socket socket = connect(host, port);
try {
return send(request, apiKey, socket);
} finally {
socket.close();
}
}
/**
* 發(fā)送序列化請(qǐng)求并等待response返回
* @param socket 連向目標(biāo)broker的socket
* @param request 序列化后的請(qǐng)求
* @return 序列化后的response
* @throws IOException
*/
private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
sendRequest(socket, request);
return getResponse(socket);
}
/**
* 發(fā)送序列化請(qǐng)求給socket
* @param socket 連向目標(biāo)broker的socket
* @param request 序列化后的請(qǐng)求
* @throws IOException
*/
private void sendRequest(Socket socket, byte[] request) throws IOException {
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
dos.writeInt(request.length);
dos.write(request);
dos.flush();
}
/**
* 從給定socket處獲取response
* @param socket 連向目標(biāo)broker的socket
* @return 獲取到的序列化后的response
* @throws IOException
*/
private byte[] getResponse(Socket socket) throws IOException {
DataInputStream dis = null;
try {
dis = new DataInputStream(socket.getInputStream());
byte[] response = new byte[dis.readInt()];
dis.readFully(response);
return response;
} finally {
if (dis != null) {
dis.close();
}
}
}
/**
* 創(chuàng)建Socket連接
* @param hostName 目標(biāo)broker主機(jī)名
* @param port 目標(biāo)broker服務(wù)端口, 比如9092
* @return 創(chuàng)建的Socket連接
* @throws IOException
*/
private Socket connect(String hostName, int port) throws IOException {
return new Socket(hostName, port);
}
/**
* 向給定socket發(fā)送請(qǐng)求
* @param request 請(qǐng)求對(duì)象
* @param apiKey 請(qǐng)求類型, 即屬于哪種請(qǐng)求
* @param socket 連向目標(biāo)broker的socket
* @return 序列化后的response
* @throws IOException
*/
private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte[] serializedRequest = buffer.array();
byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
ByteBuffer responseBuffer = ByteBuffer.wrap(response);
ResponseHeader.parse(responseBuffer);
return responseBuffer;
}
有了這些方法的鋪墊,我們就可以創(chuàng)建具體的請(qǐng)求了。
創(chuàng)建topic
/**
* 創(chuàng)建topic
* 由于只是樣例代碼,有些東西就硬編碼寫到程序里面了(比如主機(jī)名和端口),各位看官自行修改即可
* @param topicName topic名
* @param partitions 分區(qū)數(shù)
* @param replicationFactor 副本數(shù)
* @throws IOException
*/
public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
// 插入多個(gè)元素便可同時(shí)創(chuàng)建多個(gè)topic
topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
int creationTimeoutMs = 60000;
CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
CreateTopicsResponse.parse(response, request.version());
}
查看位移
/**
* 獲取某個(gè)consumer group下的某個(gè)topic分區(qū)的位移
* @param groupID group id
* @param topic topic名
* @param parititon 分區(qū)號(hào)
* @throws IOException
*/
public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
TopicPartition tp = new TopicPartition(topic, parititon);
OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
.setVersion((short)2).build();
ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
System.out.println(partitionData.offset);
}
/**
* 獲取某個(gè)consumer group下所有topic分區(qū)的位移信息
* @param groupID group id
* @return (topic分區(qū) --> 分區(qū)信息)的map
* @throws IOException
*/
public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
return resp.responseData();
}
okay, 上面就是“創(chuàng)建topic”和“查看位移”的樣例代碼,各位看官可以參考著這兩個(gè)例子構(gòu)建其他類型的請(qǐng)求。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java cglib為實(shí)體類(javabean)動(dòng)態(tài)添加屬性方式
這篇文章主要介紹了Java cglib為實(shí)體類(javabean)動(dòng)態(tài)添加屬性方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02
Java中實(shí)現(xiàn)多重排序的幾種方法小結(jié)
Java中的多重排序通常指的是同時(shí)對(duì)一個(gè)集合中的兩個(gè)或更多列或多維度的數(shù)據(jù)進(jìn)行排序,這通常通過(guò)自定義Comparator實(shí)現(xiàn),可以結(jié)合Arrays.sort()或Collections.sort()方法,當(dāng)需要進(jìn)行多重排序時(shí),即根據(jù)多個(gè)字段進(jìn)行排序,我們可以采用以下幾種方法2024-10-10
JAVA編程實(shí)現(xiàn)TCP網(wǎng)絡(luò)通訊的方法示例
這篇文章主要介紹了JAVA編程實(shí)現(xiàn)TCP網(wǎng)絡(luò)通訊的方法,簡(jiǎn)單說(shuō)明了TCP通訊的原理并結(jié)合具體實(shí)例形式分析了java實(shí)現(xiàn)TCP通訊的步驟與相關(guān)操作技巧,需要的朋友可以參考下2017-08-08
詳解Spring注解驅(qū)動(dòng)開(kāi)發(fā)之屬性賦值
今天帶大家學(xué)習(xí)Spring注解驅(qū)動(dòng)開(kāi)發(fā)的相關(guān)知識(shí),文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)Java的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05
SpringBoot之Order注解啟動(dòng)順序說(shuō)明
這篇文章主要介紹了SpringBoot之Order注解啟動(dòng)順序說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
如何解決java中遇到的for input string: "" 報(bào)錯(cuò)問(wèn)題
在本篇文章里小編給大家整理的是一篇關(guān)于如何解決java中遇到的(for input string: "")報(bào)錯(cuò)內(nèi)容,需要的朋友們可以學(xué)習(xí)下。2020-02-02

