Kafka使用Java客戶端進(jìn)行訪問(wèn)的示例代碼
本文環(huán)境如下:
操作系統(tǒng):CentOS 6 32位
JDK版本:1.8.0_77 32位
Kafka版本:0.9.0.1(Scala 2.11)
1. maven依賴包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
2. 生產(chǎn)者代碼
package com.lnho.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
3. 消費(fèi)者代碼
package com.lnho.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
4. 執(zhí)行程序
lib底下需要有:kafka-clients-0.9.0.1.jar log4j-1.2.17.jar slf4j-api-1.7.6.jar slf4j-log4j12-1.7.6.jar
生產(chǎn)者:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaProducerExample
消費(fèi)者:
java -classpath kafka-example-1.0-SNAPSHOT.jar:lib/* com.lnho.example.kafka.KafkaConsumerExample
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
ElasticSearch學(xué)習(xí)之多條件組合查詢驗(yàn)證及示例分析
這篇文章主要為大家介紹了ElasticSearch 多條件組合查詢驗(yàn)證及示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02
使用SpringAop動(dòng)態(tài)獲取mapper執(zhí)行的SQL,并保存SQL到Log表中
這篇文章主要介紹了使用SpringAop動(dòng)態(tài)獲取mapper執(zhí)行的SQL,并保存SQL到Log表中問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
SpringBoot項(xiàng)目啟動(dòng)報(bào)錯(cuò)"找不到或無(wú)法加載主類"的解決方法
在使用 IntelliJ IDEA 開(kāi)發(fā)基于 Spring Boot 框架的 Java 程序時(shí),可能會(huì)出現(xiàn)找不到或無(wú)法加載主類 com.example.springboot.SpringbootApplication的錯(cuò)誤提示,下面我們來(lái)看看如何解決吧2025-03-03
spring中定時(shí)任務(wù)taskScheduler的詳細(xì)介紹
這篇文章主要介紹了spring中定時(shí)任務(wù)taskScheduler的相關(guān)資料,文中通過(guò)示例代碼介紹的很詳細(xì),相信對(duì)大家具有一定的參考價(jià)值,有需要的朋友們下面來(lái)一起看看吧。2017-02-02
Java內(nèi)存結(jié)構(gòu)和數(shù)據(jù)類型
本文重點(diǎn)給大家介紹java內(nèi)存結(jié)構(gòu)和數(shù)據(jù)類型知識(shí),非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友參考下2016-12-12
SpringBoot?+?MyBatis-Plus構(gòu)建樹(shù)形結(jié)構(gòu)的幾種方式
在實(shí)際開(kāi)發(fā)中,很多數(shù)據(jù)都是樹(shù)形結(jié)構(gòu),本文主要介紹了SpringBoot?+?MyBatis-Plus構(gòu)建樹(shù)形結(jié)構(gòu)的幾種方式,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08

