帶你玩轉(zhuǎn)Kafka之初步使用
前言
官方文檔:http://kafka.apache.org/
中文文檔:https://kafka.apachecn.org/
Apache Kafka是分布式發(fā)布-訂閱消息系統(tǒng)。
Apache Kafka與傳統(tǒng)消息系統(tǒng)相比,有以下不同:
- 它被設計為一個分布式系統(tǒng),易于向外擴展;
- 它同時為發(fā)布和訂閱提供高吞吐量;
- 它支持多訂閱者,當失敗時能自動平衡消費者;
- 它將消息持久化到磁盤,因此可用于批量消費,例如ETL,以及實時應用程序。
1 簡單介紹

首先是一些概念:
Kafka作為一個集群,運行在一臺或者多臺服務器上.Kafka 通過 topic 對存儲的流數(shù)據(jù)進行分類。每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
Kafka有四個核心的API:
The Producer API 允許一個應用程序發(fā)布一串流式的數(shù)據(jù)到一個或者多個Kafka topic。
The Consumer API 允許一個應用程序訂閱一個或多個 topic ,并且對發(fā)布給他們的流式數(shù)據(jù)進行處理。
The Streams API 允許一個應用程序作為一個流處理器,消費一個或者多個topic產(chǎn)生的輸入流,然后生產(chǎn)一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉(zhuǎn)換。
The Connector API 允許構(gòu)建并運行可重用的生產(chǎn)者或者消費者,將Kafka topics連接到已存在的應用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個關系型數(shù)據(jù)庫,捕捉表(table)的所有變更內(nèi)容。
支持的語言(除了Java之外的):

常見概念:
1 Topics和日志
讓我們首先深入了解下Kafka的核心概念:提供一串流式的記錄— topic
Topic 就是數(shù)據(jù)主題,是數(shù)據(jù)記錄發(fā)布的地方,可以用來區(qū)分業(yè)務系統(tǒng)。Kafka中的Topics總是多訂閱者模式,一個topic可以擁有一個或者多個消費者來訂閱它的數(shù)據(jù)。
對于每一個topic, Kafka集群都會維持一個分區(qū)日志,如下所示:

每個分區(qū)都是有序且順序不可變的記錄集,并且不斷地追加到結(jié)構(gòu)化的commit log文件。分區(qū)中的每一個記錄都會分配一個id號來表示順序,我們稱之為offset,offset用來唯一的標識分區(qū)中每一條記錄。
Kafka 集群保留所有發(fā)布的記錄—無論他們是否已被消費—并通過一個可配置的參數(shù)——保留期限來控制. 舉個例子, 如果保留策略設置為2天,一條記錄發(fā)布后兩天內(nèi),可以隨時被消費,兩天過后這條記錄會被拋棄并釋放磁盤空間。Kafka的性能和數(shù)據(jù)大小無關,所以長時間存儲數(shù)據(jù)沒有什么問題.

日志中的 partition(分區(qū))有以下幾個用途。第一,當日志大小超過了單臺服務器的限制,允許日志進行擴展。每個單獨的分區(qū)都必須受限于主機的文件限制,不過一個主題可能有多個分區(qū),因此可以處理無限量的數(shù)據(jù)。第二,可以作為并行的單元集—關于這一點,更多細節(jié)如下
2 分布式
日志的分區(qū)partition (分布)在Kafka集群的服務器上。每個服務器在處理數(shù)據(jù)和請求時,共享這些分區(qū)。每一個分區(qū)都會在已配置的服務器上進行備份,確保容錯性.
每個分區(qū)都有一臺 server 作為 “l(fā)eader”,零臺或者多臺server作為 follwers 。leader server 處理一切對 partition (分區(qū))的讀寫請求,而follwers只需被動的同步leader上的數(shù)據(jù)。當leader宕機了,followers 中的一臺服務器會自動成為新的 leader。每臺 server 都會成為某些分區(qū)的 leader 和某些分區(qū)的 follower,因此集群的負載是平衡的。
3 生產(chǎn)者
生產(chǎn)者可以將數(shù)據(jù)發(fā)布到所選擇的topic中。生產(chǎn)者負責將記錄分配到topic的哪一個 partition(分區(qū))中??梢允褂醚h(huán)的方式來簡單地實現(xiàn)負載均衡,也可以根據(jù)某些語義分區(qū)函數(shù)(例如:記錄中的key)來完成。下面會介紹更多關于分區(qū)的使用。
4 消費者
消費者使用一個 消費組 名稱來進行標識,發(fā)布到topic中的每條記錄被分配給訂閱消費組中的一個消費者實例.消費者實例可以分布在多個進程中或者多個機器上。
如果所有的消費者實例在同一消費組中,消息記錄會負載平衡到每一個消費者實例.
如果所有的消費者實例在不同的消費組中,每條消息記錄會廣播到所有的消費者進程.

如圖,這個 Kafka 集群有兩臺 server 的,四個分區(qū)(p0-p3)和兩個消費者組。消費組A有兩個消費者,消費組B有四個消費者。
通常情況下,每個 topic 都會有一些消費組,一個消費組對應一個"邏輯訂閱者"。一個消費組由許多消費者實例組成,便于擴展和容錯。這就是發(fā)布和訂閱的概念,只不過訂閱者是一組消費者而不是單個的進程。
在Kafka中實現(xiàn)消費的方式是將日志中的分區(qū)劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區(qū)唯一的消費者。維護消費組中的消費關系由Kafka協(xié)議動態(tài)處理。如果新的實例加入組,他們將從組中其他成員處接管一些 partition 分區(qū);如果一個實例消失,擁有的分區(qū)將被分發(fā)到剩余的實例。
Kafka 只保證分區(qū)內(nèi)的記錄是有序的,而不保證主題中不同分區(qū)的順序。每個 partition 分區(qū)按照key值排序足以滿足大多數(shù)應用程序的需求。但如果你需要總記錄在所有記錄的上面,可使用僅有一個分區(qū)的主題來實現(xiàn),這意味著每個消費者組只有一個消費者進程。
保證
high-level Kafka給予以下保證:
生產(chǎn)者發(fā)送到特定topic partition 的消息將按照發(fā)送的順序處理。 也就是說,如果記錄M1和記錄M2由相同的生產(chǎn)者發(fā)送,并先發(fā)送M1記錄,那么M1的偏移比M2小,并在日志中較早出現(xiàn)一個消費者實例按照日志中的順序查看記錄.對于具有N個副本的主題,我們最多容忍N-1個服務器故障,從而保證不會丟失任何提交到日志中的記錄.
關于保證的更多細節(jié)可以看文檔的設計部分。
2 下載安裝
Kafka依賴于Zookeeper,而Zookeeper又依賴于Java,因此在使用Kafka之前要安裝jdk1.8的環(huán)境和啟動zookeeper服務器。
下載或安裝地址:
JDK1.8://www.dhdzp.com/article/229780.htm:
http://www.dhdzp.com/article/229783.htm:
https://kafka.apachecn.org/downloads.html
好,下面我們開始進行安裝
[root@iZ2ze4m2ri7irkf6h6n8zoZ local]# tar -zxf kafka_2.11-1.0.0.tgz [root@iZ2ze4m2ri7irkf6h6n8zoZ local]# mv kafka_2.11-1.0.0 kafka-2.11
3 基本使用
3.1 啟動Kafka
首先檢查下自己的jdk 是否安裝:
[root@iZ2ze4m2ri7irkf6h6n8zoZ local]# java -version java version "1.8.0_144" Java(TM) SE Runtime Environment (build 1.8.0_144-b01) Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)
啟動Zookeeper:
[root@iZ2ze4m2ri7irkf6h6n8zoZ zookeeper-3.5.9]# ls bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt [root@iZ2ze4m2ri7irkf6h6n8zoZ zookeeper-3.5.9]# cd conf/ [root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# cp zoo_sample.cfg zoo.cfg [root@iZ2ze4m2ri7irkf6h6n8zoZ conf]# cd ../bin/ [root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ls README.txt zkCli.cmd zkEnv.cmd zkServer.cmd zkServer.sh zkTxnLogToolkit.sh zkCleanup.sh zkCli.sh zkEnv.sh zkServer-initialize.sh zkTxnLogToolkit.cmd [root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./zkServer. zkServer.cmd zkServer.sh [root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.5.9/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動Kafka:
[root@iZ2ze4m2ri7irkf6h6n8zoZ kafka-2.11]# ls bin config libs LICENSE NOTICE site-docs [root@iZ2ze4m2ri7irkf6h6n8zoZ kafka-2.11]# cd config/ [root@iZ2ze4m2ri7irkf6h6n8zoZ config]# ls connect-console-sink.properties connect-file-source.properties log4j.properties zookeeper.properties connect-console-source.properties connect-log4j.properties producer.properties connect-distributed.properties connect-standalone.properties server.properties connect-file-sink.properties consumer.properties tools-log4j.properties [root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cd ../bin/ [root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server.properties [2021-11-20 10:21:10,326] INFO KafkaConfig values: ...... [2021-11-20 10:21:12,423] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-11-20 10:21:12,423] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser) [2021-11-20 10:21:12,424] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
3.2 簡單測試使用
新建和查看topic
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic ymx Created topic "ymx". [root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 ymx
生產(chǎn)者發(fā)送消息:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic ymx >Hello Kafka! >Hello Ymx! >Hello Kafka and Ymx! >
消費者消費消息:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ymx --from-beginning Hello Kafka! Hello Ymx! Hello Kafka and Ymx!
3.3 搭建多代理集群
3.3.1 開始搭建
首先要copy下配置文件
[root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cp server.properties server-01.properties [root@iZ2ze4m2ri7irkf6h6n8zoZ config]# cp server.properties server-02.properties [root@iZ2ze4m2ri7irkf6h6n8zoZ config]# vim server-01.properties #### 內(nèi)容開始 #### broker.id=1 # 21行左右,broker的唯一標識(同一個集群中) listeners=PLAINTEXT://:9093 # 31行左右,放開,代表kafka的端口號 log.dirs=/tmp/kafka-logs-01 # 60行左右,用逗號分隔的目錄列表,在其中存儲日志文件 #### 內(nèi)容結(jié)束 #### [root@iZ2ze4m2ri7irkf6h6n8zoZ config]# vim server-02.properties #### 內(nèi)容開始 #### broker.id=2 # 21行左右,broker的唯一標識(同一個集群中) listeners=PLAINTEXT://:9094 # 31行左右,放開,代表kafka的端口號 log.dirs=/tmp/kafka-logs-02 # 60行左右,用逗號分隔的目錄列表,在其中存儲日志文件 #### 內(nèi)容結(jié)束 ####
根據(jù)配置文件啟動Kafka(同一主機下)
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties
報錯信息:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory. # An error report file with more information is saved as: # /usr/local/kafka-2.11/bin/hs_err_pid4036.log
原因:物理機或虛擬機內(nèi)存不足,不足以保證Kafka啟動或運行時需要的內(nèi)容容量
解決方式:
增加物理機或虛擬機的內(nèi)存
減少Kafka啟動所需內(nèi)容的配置,將要修改的文件為kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M" #29行左右
3.3.2 使用
解決好之后我們開始啟動:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-01.properties [2021-11-20 10:58:33,138] INFO KafkaConfig values:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-server-start.sh ../config/server-02.properties [2021-11-20 10:59:04,187] INFO KafkaConfig values:
ps:看下我們的阿里云服務器的狀況

[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic mr-yan
Created topic "mr-yan".
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs:
Topic: mr-yan Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
- PartitionCount:主題分區(qū)數(shù)。
- ReplicationFactor:用來設置主題的副本數(shù)。
- leader:是負責給定分區(qū)所有讀寫操作的節(jié)點。每個節(jié)點都是隨機選擇的部分分區(qū)的領導者。
- replicas:是復制分區(qū)日志的節(jié)點列表,不管這些節(jié)點是leader還是僅僅活著。
- isr:是一組“同步”replicas,是replicas列表的子集,它活著并被指到leader。
進行集群環(huán)境下的使用:
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan >Hello Kafkas! >Hello Mr.Yan >
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan Hello Kafkas! Hello Mr.Yan
3.3.3 驗證容錯性
首先我們停掉一個Kafka的Broker:
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# ps -ef|grep server-01.properties
root 19859 28247 1 10:58 pts/3 ../config/server-01.properties
root 23934 16569 0 11:12 pts/11 00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# kill -9 28247
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# ps -ef|grep server-01.properties
root 32604 16569 0 11:13 pts/11 00:00:00 grep --color=auto server-01.properties
[root@iZ2ze4m2ri7irkf6h6n8zoZ ~]# cd /usr/local/kafka-2.11/bin/
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mr-yan
Topic:mr-yan PartitionCount:1 ReplicationFactor:3 Configs:
Topic: mr-yan Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
查看生產(chǎn)者和消費者的變化,并再次使用,發(fā)現(xiàn)仍可以進行使用
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic mr-yan >Hello Kafkas! >Hello Mr.Yan >[2021-11-20 11:12:28,881] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) >Hello Kafkas too! >Hello Mr.Yan too! >
[root@iZ2ze4m2ri7irkf6h6n8zoZ bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mr-yan Hello Kafkas! Hello Mr.Yan [2021-11-20 11:12:28,812] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2021-11-20 11:12:29,165] WARN [Consumer clientId=consumer-1, groupId=console-consumer-22158] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) Hello Kafkas too! Hello Mr.Yan too!
4 小總結(jié)
主題,分區(qū),副本的概念
Kafka是根據(jù)主題(topic)進行消息的傳遞,但是又有分區(qū)和副本的概念,下面來分別解釋下:
分區(qū):kafka對每一條消息的key做一個hashcode運算,然后將得到的數(shù)值對分區(qū)數(shù)量進行模運算就得到了這條消息所在分區(qū)的數(shù)字。副本:同一分區(qū)的幾個副本之間保存的是相同的數(shù)據(jù),副本之間的關系是“一主多從”,其中的主(leader)則負責對外提供讀寫操作的服務,而從(follower)則負責與主節(jié)點同步數(shù)據(jù),當主節(jié)點宕機,從節(jié)點之間能重新選舉leader進行對外服務。
kafka會保證同一個分區(qū)內(nèi)的消息有序,但是不保證主題內(nèi)的消息有序。

參考:https://kafka.apachecn.org/quickstart.html
總結(jié)
到此這篇關于帶你玩轉(zhuǎn)Kafka之初步使用的文章就介紹到這了,更多相關Kafka初步使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java編程實現(xiàn)統(tǒng)計一個字符串中各個字符出現(xiàn)次數(shù)的方法
這篇文章主要介紹了Java編程實現(xiàn)統(tǒng)計一個字符串中各個字符出現(xiàn)次數(shù)的方法,涉及java針對字符串的遍歷、判斷、運算等相關操作技巧,需要的朋友可以參考下2017-12-12
SpringCloud通過Feign傳遞List類型參數(shù)方式
這篇文章主要介紹了SpringCloud通過Feign傳遞List類型參數(shù)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03
springboot項目配置logback日志系統(tǒng)的實現(xiàn)
這篇文章主要介紹了springboot項目配置logback日志系統(tǒng)的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
linux中nohup?java?-jar啟動java項目的步驟
nohup是一個Unix和Linux命令,用于運行關閉時不會被終止的進程,這篇文章主要給大家介紹了關于linux中nohup?java?-jar啟動java項目的相關資料,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-08-08
rabbitmq的消息持久化處理開啟,再關閉后,消費者啟動報錯問題
這篇文章主要介紹了rabbitmq的消息持久化處理開啟,再關閉后,消費者啟動報錯問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11
java如何接收和發(fā)送ASCII數(shù)據(jù)
這篇文章主要介紹了java如何接收和發(fā)送ASCII數(shù)據(jù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09

