springboot 1.5.2 集成kafka的簡單例子
本文介紹了springboot 1.5.2 集成kafka的簡單例子 ,分享給大家,具體如下:
隨著spring boot 1.5版本的發(fā)布,在spring項目中與kafka集成更為簡便。
添加依賴
compile("org.springframework.kafka:spring-kafka:1.1.2.RELEASE")
添加application.properties
#kafka # 指定kafka 代理地址,可以多個 spring.kafka.bootstrap-servers=192.168.59.130:9092,192.168.59.131:9092,192.168.59.132:9092 # 指定默認消費者group id spring.kafka.consumer.group-id=myGroup # 指定默認topic id spring.kafka.template.default-topic= my-replicated-topic # 指定listener 容器中的線程數(shù),用于提高并發(fā)量 spring.kafka.listener.concurrency= 3 # 每次批量發(fā)送消息的數(shù)量 spring.kafka.producer.batch-size= 1000
configuration 啟用kafka
package cn.xiaojf.today.data.kafka.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
/**
* kafka 配置
* @author xiaojf 2017/3/24 14:09
*/
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
消息生產者
package cn.xiaojf.today.data.kafka.producer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
/**
* 消息生產者
* @author xiaojf 2017/3/24 14:36
*/
@Component
public class MsgProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void send() {
kafkaTemplate.send("my-replicated-topic","xiaojf");
kafkaTemplate.send("my-replicated-topic","xiaojf");
kafkaTemplate.metrics();
kafkaTemplate.execute(new KafkaOperations.ProducerCallback<String, String, Object>() {
@Override
public Object doInKafka(Producer<String, String> producer) {
//這里可以編寫kafka原生的api操作
return null;
}
});
//消息發(fā)送的監(jiān)聽器,用于回調返回信息
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
}
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
}
@Override
public boolean isInterestedInSuccess() {
return false;
}
});
}
}
消息消費者
package cn.xiaojf.today.data.kafka.consumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 消息消費者
* @author xiaojf 2017/3/24 14:36
*/
@Component
public class MsgConsumer {
@KafkaListener(topics = {"my-replicated-topic","my-replicated-topic2"})
public void processMessage(String content) {
System.out.println(content);
}
}
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
Springboot整合Thymeleaf引入公共的CSS和JS文件的方法及注意點
有時候很多css文件是公共的,我們必須要在每個html文件中引入它們,下面這篇文章主要給大家介紹了關于Springboot整合Thymeleaf引入公共的CSS和JS文件的方法及注意點,需要的朋友可以參考下2024-06-06
微服務Redis-Session共享登錄狀態(tài)的過程詳解
這篇文章主要介紹了微服務Redis-Session共享登錄狀態(tài),本文采取Spring security做登錄校驗,用redis做session共享,實現(xiàn)單服務登錄可靠性,微服務之間調用的可靠性與通用性,需要的朋友可以參考下2023-12-12
json如何解析混合數(shù)組對象到實體類的list集合里去
這篇文章主要介紹了json解析混合數(shù)組對象到實體類的list集合里去的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
SpringBoot報錯Invalid?bound?statement?(not?found)問題排查和解決方案
這篇文章主要介紹了SpringBoot報錯Invalid?bound?statement?(not?found)問題排查和解決方案,文中通過圖文結合的方式講解的非常詳細,對大家的學習或工作有一定的幫助,需要的朋友可以參考下2024-03-03
springboot集成spark并使用spark-sql的示例詳解
這篇文章主要介紹了spring-boot集成spark并使用spark-sql的方法,本文通過示例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-02-02

