Kafka簡單客戶端編程實(shí)例
今天,我們給大家?guī)硪黄绾卫肒afka的API進(jìn)行客戶端編程的文章,這篇文章很簡單,就是利用Kafka的API創(chuàng)建一個(gè)生產(chǎn)者和消費(fèi)者,生產(chǎn)者不斷向Kafka寫入消息,消費(fèi)者則不斷消費(fèi)Kafka的消息。下面是具體的實(shí)例代碼。
一、創(chuàng)建配置類Config
這個(gè)類很簡單,只是存放了兩個(gè)常量,一個(gè)是話題TOPIC,一個(gè)是線程數(shù)THREADS
package com.lya.kafka;
/**
* 配置項(xiàng)
* @author liuyazhuang
*
*/
public class Config {
/**
* 話題
*/
public static final String TOPIC = "wordcount";
/**
* 線程數(shù)
*/
public static final Integer THREADS = 1;
}
二、編程生產(chǎn)者類ProducerDemo
這個(gè)類的主要作用就是向Kafka寫入相應(yīng)的消息,并且將消息寫入wordcount話題。
package com.lya.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 生產(chǎn)者實(shí)例
* @author liuyazhuang
*
*/
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "192.168.209.121:2181");
props.put("metadata.broker.list","192.168.209.121:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// 發(fā)送業(yè)務(wù)消息
// 讀取文件 讀取內(nèi)存數(shù)據(jù)庫 讀socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage<String, String>(Config.TOPIC,
"this number ===>>> " + i));
}
}
}
三、編寫消息者類ConsumerDemo
這個(gè)類的主要作用就是消費(fèi)Kafka中wordcount話題的消息。
package com.lya.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 消費(fèi)者實(shí)例
* @author liuyazhuang
*
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.209.121:2181");
props.put("group.id", "1111");
props.put("auto.offset.reset", "smallest");
props.put("zk.connectiontimeout.ms", "15000");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Config.TOPIC, Config.THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC);
for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
new Thread(new Runnable() {
@Override
public void run() {
for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}
四、運(yùn)行實(shí)例
首先,運(yùn)行消費(fèi)者類ConsumerDemo
運(yùn)行結(jié)果如下:

沒有打印任何信息。
此時(shí),我們運(yùn)行生產(chǎn)者類ProducerDemo
我們?cè)俅未蜷_消費(fèi)者的控制臺(tái)查看如下:

打印出了生產(chǎn)者生產(chǎn)的消息。
至此,Kafka簡單客戶端編程實(shí)例結(jié)束。
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
基于Java創(chuàng)建XML(無中文亂碼)過程解析
這篇文章主要介紹了基于Java創(chuàng)建XML(無中文亂碼)過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10
jpa實(shí)體@ManyToOne @OneToMany無限遞歸方式
這篇文章主要介紹了jpa實(shí)體@ManyToOne @OneToMany無限遞歸方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
Java?數(shù)據(jù)結(jié)構(gòu)與算法系列精講之單向鏈表
單向鏈表特點(diǎn)是鏈表的鏈接方向是單向的,訪問要通過順序讀取從頭部開始。鏈表是使用指針構(gòu)造的列表,是由一個(gè)個(gè)結(jié)點(diǎn)組裝起來的,又稱為結(jié)點(diǎn)列表。其中每個(gè)結(jié)點(diǎn)都有指針成員變量指向列表中的下一個(gè)結(jié)點(diǎn),head指針指向第一個(gè)結(jié)點(diǎn)稱為表頭,而終止于最后一個(gè)指向nuLL的指針2022-02-02
java后臺(tái)利用Apache poi 生成excel文檔提供前臺(tái)下載示例
本篇文章主要介紹了java后臺(tái)利用Apache poi 生成excel文檔提供前臺(tái)下載示例,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-05-05
Java實(shí)現(xiàn)畫圖的詳細(xì)步驟(完整代碼)
今天給大家?guī)淼氖顷P(guān)于Java的相關(guān)知識(shí),文章圍繞著Java實(shí)現(xiàn)畫圖的詳細(xì)步驟展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06
詳解SpringBoot的jar為什么可以直接運(yùn)行
SpringBoot提供了一個(gè)插件spring-boot-maven-plugin用于把程序打包成一個(gè)可執(zhí)行的jar包,本文給大家介紹了為什么SpringBoot的jar可以直接運(yùn)行,文中有相關(guān)的代碼示例供大家參考,感興趣的朋友可以參考下2024-02-02

