Java使用kafka發(fā)送和生產(chǎn)消息的示例
更新時間:2018年04月24日 15:59:17 作者:will的猜想
本篇文章主要介紹了Java使用kafka發(fā)送和生產(chǎn)消息的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
1. maven依賴包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
2. 生產(chǎn)者代碼
package com.lnho.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
3. 消費(fèi)者代碼
package com.lnho.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:
- kafka調(diào)試中遇到Connection to node -1 could not be established. Broker may not be available.
- Docker + Nodejs + Kafka + Redis + MySQL搭建簡單秒殺環(huán)境
- Kafka 常用命令行詳細(xì)介紹及整理
- docker部署kafka的方法步驟
- Kafka利用Java實(shí)現(xiàn)數(shù)據(jù)的生產(chǎn)和消費(fèi)實(shí)例教程
- kafka監(jiān)控獲取指定topic的消息總量示例
- 詳解使用docker搭建kafka環(huán)境
- node連接kafka2.0實(shí)現(xiàn)方法示例
相關(guān)文章
深入了解Java中Cookie和Session的區(qū)別
會話跟蹤是Web程序中常用的技術(shù),用來跟蹤用戶的整個會話,常用的會話跟蹤技術(shù)是Cookie與Session,本文就詳細(xì)的介紹一下Java中Cookie和Session的區(qū)別,感興趣的可以了解一下2023-06-06
Mybatis實(shí)現(xiàn)自定義的typehandler三步曲
這篇文章主要介紹了Mybatis實(shí)現(xiàn)自定義的typehandler三步曲的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下2016-07-07
SpringBoot中注解實(shí)現(xiàn)定時任務(wù)的兩種方式
這篇文章主要介紹了SpringBoot中注解實(shí)現(xiàn)定時任務(wù)的兩種方式,SpringBoot 定時任務(wù)是一種在SpringBoot應(yīng)用中自動執(zhí)行任務(wù)的機(jī)制,通過使用Spring框架提供的@Scheduled注解,我們可以輕松地創(chuàng)建定時任務(wù),需要的朋友可以參考下2023-10-10
Java實(shí)現(xiàn)數(shù)組翻轉(zhuǎn)的實(shí)現(xiàn)代碼
這篇文章主要介紹了Java實(shí)現(xiàn)數(shù)組翻轉(zhuǎn)的實(shí)現(xiàn)代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Java開發(fā)SSM框架微信退款的實(shí)現(xiàn)
這篇文章是Java微信退款的教程,退款之前用戶需要先進(jìn)行支付,支付之后才可以使用退款,非常具有實(shí)用價值,感興趣的小伙伴們可以參考一下2018-10-10
劍指Offer之Java算法習(xí)題精講二叉樹專項(xiàng)解析
跟著思路走,之后從簡單題入手,反復(fù)去看,做過之后可能會忘記,之后再做一次,記不住就反復(fù)做,反復(fù)尋求思路和規(guī)律,慢慢積累就會發(fā)現(xiàn)質(zhì)的變化2022-03-03

