Kafka安裝部署+go整合過程
1、Kafka的安裝
1、下載與安裝Kafka
Kafka官網(wǎng)https://Kafka.apache.org/downloads

所以這里推薦的版本是 : https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
將下載下來的安裝包直接解壓到一個路徑下即可完成Kafka的安裝,這里統(tǒng)一將Kafka安裝到/usr/local目錄下
基本操作過程如下:
mkdir -p /www/kuangstudy cd /www/kuangstudy wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz tar -zxvf kafka_2.12-2.7.2.tgz -C /usr/local/ mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka #新建存放日志和數(shù)據(jù)的文件夾 mkdir /usr/local/kafka/logs
這里我們將Kafka安裝到了/usr/local目錄下。
2、配置Kafka
這里將Kafka安裝到/usr/local目錄下
因此,Kafka的主配置文件為/usr/local/Kafka/config/server.properties,這里以節(jié)點Kafkazk1為例,重點介紹一些常用配置項的含義:
broker.id=1 listeners=PLAINTEXT://127.0.0.1:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/usr/local/Kafka/logs num.partitions=6 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 #不是集群,所以可以寫成localhost #zookeeper.connect=127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true delete.topic.enable=true
每個配置項含義如下:
broker.id:每一個broker在集群中的唯一表示,要求是正數(shù)。當該服務(wù)器的IP地址發(fā)生改變時,broker.id沒有變化,則不會影響consumers的消息情況。listeners:設(shè)置Kafka的監(jiān)聽地址與端口,可以將監(jiān)聽地址設(shè)置為主機名或IP地址,這里將監(jiān)聽地址設(shè)置為IP地址。log.dirs:這個參數(shù)用于配置Kafka保存數(shù)據(jù)的位置,Kafka中所有的消息都會存在這個目錄下。可以通過逗號來指定多個路徑, Kafka會根據(jù)最少被使用的原則選擇目錄分配新的parition。需要注意的是,Kafka在分配parition的時候選擇的規(guī)則不是按照磁盤的空間大小來定的,而是根據(jù)分配的 parition的個數(shù)多小而定。num.partitions:這個參數(shù)用于設(shè)置新創(chuàng)建的topic有多少個分區(qū),可以根據(jù)消費者實際情況配置,配置過小會影響消費性能。這里配置6個。log.retention.hours:這個參數(shù)用于配置Kafka中消息保存的時間,還支持log.retention.minutes和 log.retention.ms配置項。這三個參數(shù)都會控制刪除過期數(shù)據(jù)的時間,推薦使用log.retention.ms。如果多個同時設(shè)置,那么會選擇最小的那個。log.segment.bytes:配置partition中每個segment數(shù)據(jù)文件的大小,默認是1GB,超過這個大小會自動創(chuàng)建一個新的segment file。
zookeeper.connect
:這個參數(shù)用于指定zookeeper所在的地址,它存儲了broker的元信息。 這個值可以通過逗號設(shè)置多個值,每個值的格式均為:hostname:port/path,每個部分的含義如下:
- hostname:表示zookeeper服務(wù)器的主機名或者IP地址,這里設(shè)置為IP地址。
- port: 表示是zookeeper服務(wù)器監(jiān)聽連接的端口號。
- /path:表示Kafka在zookeeper上的根目錄。如果不設(shè)置,會使用根目錄。
auto.create.topics.enable:這個參數(shù)用于設(shè)置是否自動創(chuàng)建topic,如果請求一個topic時發(fā)現(xiàn)還沒有創(chuàng)建, Kafka會在broker上自動創(chuàng)建一個topic,如果需要嚴格的控制topic的創(chuàng)建,那么可以設(shè)置auto.create.topics.enable為false,禁止自動創(chuàng)建topic。
delete.topic.enable:在0.8.2版本之后,Kafka提供了刪除topic的功能,但是默認并不會直接將topic數(shù)據(jù)物理刪除。如果要從物理上刪除(即刪除topic后,數(shù)據(jù)文件也會一同刪除),就需要設(shè)置此配置項為true。
3、添加環(huán)境變量
$ vim /etc/profile export kafka_HOME=/usr/local/kafka export PATH=$PATH:$kafka_HOME/bin #生效 $ source /etc/profile
zookeeper服務(wù)的啟動
cd /usr/local/kafka/bin # 占用啟動 ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & # 后臺啟動 nohup ./zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
4、Kafka啟動腳本
$ vim /usr/lib/systemd/system/kafka.service [Unit] Description=Apache kafka server (broker) After=network.target zookeeper.service [Service] Type=simple User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-failure [Install] WantedBy=multi-user.target
systemctl daemon-reload
5、啟動Kafka
在啟動Kafka集群前,需要確保ZooKeeper集群已經(jīng)正常啟動。接著,依次在Kafka各個節(jié)點上執(zhí)行如下命令即可:
$ cd /usr/local/kafka $ nohup bin/kafka-server-start.sh config/server.properties & # 或者 $ systemctl start kafka $ jps 21840 kafka 15593 Jps 15789 QuorumPeerMain
這里將Kafka放到后臺運行,啟動后,會在啟動Kafka的當前目錄下生成一個nohup.out文件,可通過此文件查看Kafka的啟動和運行狀態(tài)。通過jps指令,可以看到有個Kafka標識,這是Kafka進程成功啟動的標志。
6、測試Kafka基本命令操作
kefka提供了多個命令用于查看、創(chuàng)建、修改、刪除topic信息,也可以通過命令測試如何生產(chǎn)消息、消費消息等,這些命令位于Kafka安裝目錄的bin目錄下,這里是/usr/local/Kafka/bin。
登錄任意一臺Kafka集群節(jié)點,切換到此目錄下,即可進行命令操作。
下面列舉Kafka的一些常用命令的使用方法。
(1)顯示topic列表
#kafka-topics.sh --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --list $ kafka-topics.sh --zookeeper 127.0.0.1:2181 --list topic123
(2)創(chuàng)建一個topic,并指定topic屬性(副本數(shù)、分區(qū)數(shù)等)
#kafka-topics.sh --create --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 $ kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic topic123 Created topic topic123. #--replication-factor表示指定副本的個數(shù)
(3)查看某個topic的狀態(tài)
#kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic123 Topic: topic123 PartitionCount: 3 ReplicationFactor: 1 Configs: Topic: topic123 Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: topic123 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
(4)生產(chǎn)消息 阻塞狀態(tài)
#kafka-console-producer.sh --broker-list 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic123
(5)消費消息 阻塞狀態(tài)
#kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 $ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 #從頭開始消費消息 #Kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic123 --from-beginning $ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning
(6)刪除topic
#kafka-topics.sh --delete --zookeeper 127.0.0.1:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123 $ kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topic_
2、GO整合Kafka實現(xiàn)消息發(fā)送和訂閱
2.1 消息生產(chǎn)代碼示例
package main
import (
"fmt"
"github.com/IBM/sarama"
)
func main() {
// 配置生產(chǎn)者信息
conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll // 生產(chǎn)者等待所有分區(qū)副本成功提交消息
conf.Producer.Return.Successes = true // 成功消息寫入返回
client, err := sarama.NewSyncProducer([]string{"47.115.230.36:9092"}, conf)
if nil != err {
fmt.Println("create Kafka sync producer failed", err)
return
}
defer client.Close()
msg := &sarama.ProducerMessage{
Topic: "topic123", // 指定消息主題
Value: sarama.StringEncoder("hello world"), // 構(gòu)造消息
}
// 發(fā)送消息
_, _, err = client.SendMessage(msg)
if nil != err {
fmt.Println("send message to Kafka failed", err)
return
}
fmt.Println("send message success")
}2.2 消息消費代碼示例
package main
import (
"fmt"
"github.com/IBM/sarama"
)
/**
* @desc 生產(chǎn)者
* @author feige
* @date 2023-11-15
* @version 1.0
*/
func main() {
// 創(chuàng)建一個消費者
consumer, err := sarama.NewConsumer([]string{"47.115.230.36:9092"}, nil)
if err != nil {
fmt.Println("消費者kafka連接服務(wù)失敗,失敗的原因:", err)
return
}
// 從topic123這個主題去獲取消息
partitions, err := consumer.Partitions("topic123")
if err != nil {
fmt.Println("主題獲取失敗,失敗的原因:", err)
return
}
fmt.Println(partitions)
// 開始遍歷分區(qū)中的消息,開始進行消費
for _, partition := range partitions {
pc, err := consumer.ConsumePartition("topic123", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("分區(qū)數(shù)據(jù)獲取失敗,失敗的原因:", err)
return
}
defer pc.AsyncClose()
// 開始異步獲取消息
go func(sarama.PartitionConsumer) {
for message := range pc.Messages() {
fmt.Printf("當前消費的分區(qū)是:%d,offset:%d,key:%v,消息的內(nèi)容是:%v", message.Partition,
message.Offset, message.Key, string(message.Value))
fmt.Println("")
}
}(pc)
}
// 阻塞讓消費一直處于監(jiān)聽狀態(tài)
select {}
}2.3 創(chuàng)建主題代碼示例
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func CreateTopic(addrs []string, topic string) bool {
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0 // 設(shè)置客戶端版本
config.Admin.Timeout = 3 * time.Second // 設(shè)置Admin請求超時時間
admin, err := sarama.NewClusterAdmin(addrs, config)
if err!= nil {
return false
}
defer admin.Close()
err = admin.CreateTopic(topic, &sarama.TopicDetail{NumPartitions: 3, ReplicationFactor: 2}, false)
if err == nil {
fmt.Println("success create topic:", topic)
} else {
fmt.Println("failed create topic:", topic)
}
return err == nil
}2.4 性能測試結(jié)果
Kafka目前已經(jīng)成為云計算領(lǐng)域中的“事件驅(qū)動”架構(gòu)、微服務(wù)架構(gòu)中的主要消息隊列,隨著越來越多的公司和組織開始采用Kafka作為基礎(chǔ)消息隊列技術(shù),越來越多的性能測試報告也陸續(xù)出來。筆者提前做了一輪性能測試,并發(fā)現(xiàn)它的消費性能比其它消息隊列還要好,甚至更好些。下面是測試結(jié)果:
測試環(huán)境:
- 操作系統(tǒng):Ubuntu 16.04
- CPU:Intel® Xeon® Gold 6148 CPU @ 2.40GHz
- 內(nèi)存:128G DDR4 ECC
- Kafka集群:3節(jié)點,每節(jié)點配置6個CPU、32G內(nèi)存、SSD
- 測試用例:生產(chǎn)者每秒鐘發(fā)送2萬條消息,消費者每秒鐘消費100條消息。
測試結(jié)果:
Kafka消費者
每秒消費100條消息,平均耗時:67毫秒
每秒消費1000條消息,平均耗時:6.7毫秒
RabbitMQ消費者
每秒消費100條消息,平均耗時:1038毫秒
每秒消費1000條消息,平均耗時:10.38毫秒
3、參考
github.com/Shopify/sarama github.com/bsm/sarama-cluster
生產(chǎn)者
import (
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/golang/glog"
)
//同步生產(chǎn)者
func Produce() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll //賦值為-1:這意味著producer在follower副本確認接收到數(shù)據(jù)后才算一次發(fā)送完成。
config.Producer.Partitioner = sarama.NewRandomPartitioner //寫到隨機分區(qū)中,默認設(shè)置8個分區(qū)
config.Producer.Return.Successes = true
msg := &sarama.ProducerMessage{}
msg.Topic = `test0`
msg.Value = sarama.StringEncoder("Hello World!")
client, err := sarama.NewSyncProducer([]string{"Kafka_master:9092"}, config)
if err != nil {
fmt.Println("producer close err, ", err)
return
}
defer client.Close()
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed, ", err)
return
}
fmt.Printf("分區(qū)ID:%v, offset:%v \n", pid, offset)
}
//異步生產(chǎn)者
func AsyncProducer() {
var topics = "test0"
config := sarama.NewConfig()
config.Producer.Return.Successes = true //必須有這個選項
config.Producer.Timeout = 5 * time.Second
p, err := sarama.NewAsyncProducer(strings.Split("Kafka_master:9092", ","), config)
defer p.Close()
if err != nil {
return
}
//這個部分一定要寫,不然通道會被堵塞
go func(p sarama.AsyncProducer) {
errors := p.Errors()
success := p.Successes()
for {
select {
case err := <-errors:
if err != nil {
glog.Errorln(err)
}
case <-success:
}
}
}(p)
for {
v := "async: " + strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))
fmt.Fprintln(os.Stdout, v)
msg := &sarama.ProducerMessage{
Topic: topics,
Value: sarama.ByteEncoder(v),
}
p.Input() <- msg
time.Sleep(time.Second * 1)
}
}消費者
package consumer
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"github.com/golang/glog"
)
//單個消費者
func Consumer() {
var wg sync.WaitGroup
consumer, err := sarama.NewConsumer([]string{"Kafka_master:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("test0") //獲得該topic所有的分區(qū)
if err != nil {
fmt.Println("Failed to get the list of partition:, ", err)
return
}
for partition := range partitionList {
pc, err := consumer.ConsumePartition("test0", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
return
}
wg.Add(1)
go func(sarama.PartitionConsumer) { //為每個分區(qū)開一個go協(xié)程去取值
for msg := range pc.Messages() { //阻塞直到有值發(fā)送過來,然后再繼續(xù)等待
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
defer pc.AsyncClose()
wg.Done()
}(pc)
}
wg.Wait()
}
//消費組
func ConsumerGroup() {
groupID := "test-consumer-group"
config := cluster.NewConfig()
config.Group.Return.Notifications = true
config.Consumer.Offsets.CommitInterval = 1 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetNewest //初始從最新的offset開始
c, err := cluster.NewConsumer(strings.Split("Kafka_master:9092", ","), groupID, strings.Split("test0", ","), config)
if err != nil {
glog.Errorf("Failed open consumer: %v", err)
return
}
defer c.Close()
go func(c *cluster.Consumer) {
errors := c.Errors()
noti := c.Notifications()
for {
select {
case err := <-errors:
glog.Errorln(err)
case <-noti:
}
}
}(c)
for msg := range c.Messages() {
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
c.MarkOffset(msg, "") //MarkOffset 并不是實時寫入Kafka,有可能在程序crash時丟掉未提交的offset
}
}主函數(shù)
package main
import (
"strom-huang-go/go_Kafka/consumer"
)
func main() {
// produce.AsyncProducer()
consumer.Consumer()
}到此這篇關(guān)于Kafka安裝部署+go整合的文章就介紹到這了,更多相關(guān)Kafka安裝部署內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言map與string的相互轉(zhuǎn)換的實現(xiàn)
這篇文章主要介紹了go語言map與string的相互轉(zhuǎn)換的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
go實現(xiàn)grpc四種數(shù)據(jù)流模式
這篇文章主要為大家介紹了go實現(xiàn)grpc四種數(shù)據(jù)流模式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04
Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別介紹
這篇文章主要給大家介紹了關(guān)于Go語言中結(jié)構(gòu)體方法副本傳參與指針傳參的區(qū)別的相關(guān)資料,文中先對GO語言結(jié)構(gòu)體方法跟結(jié)構(gòu)體指針方法的區(qū)別進行了一些簡單的介紹,來幫助大家理解學習,需要的朋友可以參考下。2017-12-12

