第七周,大數(shù)據(jù)建模 scala 復(fù)習(xí) , KaFka 回顧 , 繼續(xù)講解
第七周,10-13 , scala 復(fù)習(xí) , KaFka 回顧
KAFAKA 消息中介,
比如說(shuō), 淘寶的訂單交易系統(tǒng), 產(chǎn)生訂單的信息, 對(duì)這些訂單要實(shí)時(shí)分析, 對(duì)他關(guān)心的, 進(jìn)行推送, 這時(shí)要用 KAFKA 進(jìn)行
推送, 再?gòu)腒AFKA中讀取出來(lái), 持久化, 7天可以存儲(chǔ), 可以高存儲(chǔ)量, 百萬(wàn)級(jí)別。
生產(chǎn)者發(fā)布一個(gè)消息, 或者就是客戶(hù)下一個(gè)訂單, 這個(gè)消息推送到 KAFKA的主題TOPIC中, 消費(fèi)者要訂閱這個(gè)主題, 不同的
TOPIC 要進(jìn)行不同分析。 等于不同的消費(fèi)者會(huì)訂閱不同的主題, 再?gòu)腒AFKA的集群中獲得。
KAFKA 集群 由 ZOOKEPPER管理。 做一些搜索引擎的事情, KAFKA的數(shù)量, 可以和 瀏覽器對(duì)接, ELESHCHE , 輸入
關(guān)鍵詞, 進(jìn)行 TOPIC 主題的創(chuàng)建。
KAFKA就是一個(gè) 高吞吐量的一個(gè)集群。
淘寶數(shù)據(jù)要出現(xiàn)顯示大屏 , 把實(shí)時(shí)處理的數(shù)據(jù), 可以放在 Redis 里面,
序列號(hào), 也就是偏移量, 這個(gè)就是由 ZOOKEPPER 管理, 消費(fèi)者要從 KAFKA進(jìn)行 消費(fèi), 也需要進(jìn)行記錄。
在 不同的 TOPIC 也由 ZOOKEPPER 管理, 這2個(gè)集群都要建立。
分區(qū) partion , 可以設(shè)置在 TOPIC下 。
Broker 就是一個(gè) 緩存代理。
日志類(lèi)、訂單類(lèi)屬于不同的 分區(qū) PARTION , OFFSET 就是序號(hào)或者偏移量。
接下來(lái)講: KAFKA的 MASSAGE ,通訊的基本單位, 每個(gè)生產(chǎn)者可以向一個(gè) TOPIC 發(fā)布一些消息。
KAFKA 中的MASSAGE是以 TOPIC 為基本單位組織的,
MASSAGE 是如何進(jìn)行存儲(chǔ)的, TOPIC 對(duì)應(yīng)一個(gè) 偏移量, 也就是 ID , 也就是指針,
總之, 幾十年來(lái), 計(jì)算機(jī)還是進(jìn)行表管理。
放入 TOPIC ,變成一個(gè)字符串, 然后就是用 空格進(jìn)行確認(rèn), 總之, 把生產(chǎn)者的數(shù)據(jù), 存儲(chǔ)到 KAFKA
消費(fèi)者再?gòu)倪@個(gè) MASSAGE中取得數(shù)據(jù)。
11:10 開(kāi)始上課, KAFKA的 消息處理機(jī)制。
1、發(fā)送到 PARTITION 中的消息, 自動(dòng)追加到日志中, 順序是一至的,
2、對(duì)于消費(fèi)者 , 消費(fèi)消息的順序也是一至的。
3、如果 topic 的 replication factor 為 n , 那么允許n-1 個(gè) kafka 的實(shí)例失效
4、kafka 對(duì)消息的重復(fù)、丟失、錯(cuò)誤以及順序沒(méi)有嚴(yán)格的要求。
5、kafka 提供 at-least-once delivery , 當(dāng)消費(fèi)者宕機(jī)后, 有些消息可能會(huì)被重復(fù) 發(fā)送 delivry
6、 因每個(gè) partition只會(huì)被 消費(fèi)者組內(nèi)部的一個(gè)消費(fèi)者消費(fèi)。 KAFKA是保證每個(gè) PARTITION 內(nèi)的消息會(huì)被順序訂閱。
7、Kafka 為每條消息計(jì)算 CRC檢驗(yàn), 用于錯(cuò)誤檢測(cè), CRC檢驗(yàn)不通過(guò)的消息會(huì)直接被丟棄掉
ACK校驗(yàn), 當(dāng)消費(fèi)者消費(fèi)成功, 返回ACK消息。
KAFKA數(shù)據(jù)傳輸?shù)臋C(jī)制又是什么?
1、at most once: 最多一次, 這個(gè)和 JMS 中的非持久化消息類(lèi)似, 無(wú)論成敗, 將不會(huì)重發(fā)。
2、at least once : 消息至少發(fā)送一次, 如果消息美未能接受成功, 可能進(jìn)行重發(fā), 直到接受成功。
3、exactly once : 消息只會(huì)發(fā)送一次 。
對(duì)于 這 3點(diǎn), 做詳細(xì)描述。
KafAKA的存儲(chǔ)策略,
生產(chǎn)者生產(chǎn)的消息, 然后在 kaFka 存儲(chǔ) , 是順序產(chǎn)生的, offset 不一致
一、kafka的存儲(chǔ)機(jī)制
kafka通過(guò)topic來(lái)分主題存放數(shù)據(jù),主題內(nèi)有分區(qū),分區(qū)可以有多個(gè)副本,分區(qū)的內(nèi)部還細(xì)分為若干個(gè)segment。
所謂的分區(qū)其實(shí)就是在kafka對(duì)應(yīng)存儲(chǔ)目錄下創(chuàng)建的文件夾,文件夾的名字是主題名加上分區(qū)編號(hào),編號(hào)從0開(kāi)始。
1、segment
所謂的segment其實(shí)就是在分區(qū)對(duì)應(yīng)的文件夾下產(chǎn)生的文件。
一個(gè)分區(qū)會(huì)被劃分成大小相等的若干segment,這樣一方面保證了分區(qū)的數(shù)據(jù)被劃分到多個(gè)文件中保證不會(huì)產(chǎn)生體積過(guò)大的文件;
另一方面可以基于這些segment文件進(jìn)行歷史數(shù)據(jù)的刪除,提高效率。
一個(gè)segment又由一個(gè).log和一個(gè).index文件組成。
1..log
.log文件為數(shù)據(jù)文件用來(lái)存放數(shù)據(jù)分段數(shù)據(jù)。
2..index
.index為索引文件保存對(duì)對(duì)應(yīng)的.log文件的索引信息。
在.index文件中,保存了對(duì)對(duì)應(yīng).log文件的索引信息,通過(guò)查找.index文件可以獲知每個(gè)存儲(chǔ)在當(dāng)前segment中的offset在.log文件中的開(kāi)始位置,
而每條日志有其固定格式,保存了包括offset編號(hào)、日志長(zhǎng)度、key的長(zhǎng)度等相關(guān)信息,通過(guò)這個(gè)固定格式中的數(shù)據(jù)可以確定出當(dāng)前offset的結(jié)束位置,
從而對(duì)數(shù)據(jù)進(jìn)行讀取。
3.命名規(guī)則
這兩個(gè)文件的命名規(guī)則為:
partition全局的第一個(gè)segment從0開(kāi)始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值,數(shù)值大小為64位,
20位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用0填充。
2、讀取數(shù)據(jù)
開(kāi)始讀取指定分區(qū)中某個(gè)offset對(duì)應(yīng)的數(shù)據(jù)時(shí),先根據(jù)offset和當(dāng)前分區(qū)的所有segment的名稱(chēng)做比較,確定出數(shù)據(jù)在哪個(gè)segment中,
再查找該segment的索引文件,確定當(dāng)前offset在數(shù)據(jù)文件中的開(kāi)始位置,最后從該位置開(kāi)始讀取數(shù)據(jù)文件,在根據(jù)數(shù)據(jù)格式判斷結(jié)果,
獲取完整數(shù)據(jù)。
二、可靠性保證
1、AR
在Kafka中維護(hù)了一個(gè)AR列表,包括所有的分區(qū)的副本。AR又分為ISR和OSR。
AR = ISR + OSR。
AR、ISR、OSR、LEO、HW這些信息都被保存在Zookeeper中。
1.ISR
ISR中的副本都要同步leader中的數(shù)據(jù),只有都同步完成了數(shù)據(jù)才認(rèn)為是成功提交了,成功提交之后才能供外界訪(fǎng)問(wèn)。
在這個(gè)同步的過(guò)程中,數(shù)據(jù)即使已經(jīng)寫(xiě)入也不能被外界訪(fǎng)問(wèn),這個(gè)過(guò)程是通過(guò)LEO-HW機(jī)制來(lái)實(shí)現(xiàn)的。
2.OSR
OSR內(nèi)的副本是否同步了leader的數(shù)據(jù),不影響數(shù)據(jù)的提交,OSR內(nèi)的follower盡力的去同步leader,可能數(shù)據(jù)版本會(huì)落后。
最開(kāi)始所有的副本都在ISR中,在kafka工作的過(guò)程中,如果某個(gè)副本同步速度慢于replica.lag.time.max.ms指定的閾值,
則被踢出ISR存入OSR,如果后續(xù)速度恢復(fù)可以回到ISR中。
3.LEO
LogEndOffset:分區(qū)的最新的數(shù)據(jù)的offset,當(dāng)數(shù)據(jù)寫(xiě)入leader后,LEO就立即執(zhí)行該最新數(shù)據(jù)。相當(dāng)于最新數(shù)據(jù)標(biāo)識(shí)位。
4.HW
HighWatermark:只有寫(xiě)入的數(shù)據(jù)被同步到所有的ISR中的副本后,數(shù)據(jù)才認(rèn)為已提交,HW更新到該位置,HW之前的數(shù)據(jù)才可以
被消費(fèi)者訪(fǎng)問(wèn),保證沒(méi)有同步完成的數(shù)據(jù)不會(huì)被消費(fèi)者訪(fǎng)問(wèn)到。相當(dāng)于所有副本同步數(shù)據(jù)標(biāo)識(shí)位。
在leader宕機(jī)后,只能從ISR列表中選取新的leader,無(wú)論ISR中哪個(gè)副本被選為新的leader,它都知道HW之前的數(shù)據(jù),
可以保證在切換了leader后,消費(fèi)者可以繼續(xù)看到HW之前已經(jīng)提交的數(shù)據(jù)。
所以L(fǎng)EO代表已經(jīng)寫(xiě)入的最新數(shù)據(jù)位置,而HW表示已經(jīng)同步完成的數(shù)據(jù),只有HW之前的數(shù)據(jù)才能被外界訪(fǎng)問(wèn)。
5.HW截?cái)鄼C(jī)制
如果leader宕機(jī),選出了新的leader,而新的leader并不能保證已經(jīng)完全同步了之前l(fā)eader的所有數(shù)據(jù),只能保證HW之前的數(shù)據(jù)是同步過(guò)的,此時(shí)所有的follower都要將數(shù)據(jù)截?cái)嗟紿W的位置,再和新的leader同步數(shù)據(jù),來(lái)保證數(shù)據(jù)一致。
當(dāng)宕機(jī)的leader恢復(fù),發(fā)現(xiàn)新的leader中的數(shù)據(jù)和自己持有的數(shù)據(jù)不一致,此時(shí)宕機(jī)的leader會(huì)將自己的數(shù)據(jù)截?cái)嗟藉礄C(jī)之前的hw位置,然后同步新leader的數(shù)據(jù)。宕機(jī)的leader活過(guò)來(lái)也像follower一樣同步數(shù)據(jù),來(lái)保證數(shù)據(jù)的一致性。
2、生產(chǎn)者可靠性級(jí)別
通過(guò)以上的講解,已經(jīng)可以保證kafka集群內(nèi)部的可靠性,但是在生產(chǎn)者向kafka集群發(fā)送時(shí),數(shù)據(jù)經(jīng)過(guò)網(wǎng)絡(luò)傳輸,也是不可靠的,可能因?yàn)榫W(wǎng)絡(luò)延遲、閃斷等原因造成數(shù)據(jù)的丟失。
kafka為生產(chǎn)者提供了如下的三種可靠性級(jí)別,通過(guò)不同策略保證不同的可靠性保障。
其實(shí)此策略配置的就是leader將成功接收消息信息響應(yīng)給客戶(hù)端的時(shí)機(jī)。
通過(guò)request.required.acks參數(shù)配置:
1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader收到數(shù)據(jù)后發(fā)送成功信息,生產(chǎn)者收到后認(rèn)為發(fā)送數(shù)據(jù)成功,如果一直收不到成功消息,則生產(chǎn)者認(rèn)為發(fā)送數(shù)據(jù)失敗會(huì)自動(dòng)重發(fā)數(shù)據(jù)。
當(dāng)leader宕機(jī)時(shí),可能丟失數(shù)據(jù)。
0:生產(chǎn)者不停向leader發(fā)送數(shù)據(jù),而不需要leader反饋成功消息。
這種模式效率最高,可靠性最低??赡茉诎l(fā)送過(guò)程中丟失數(shù)據(jù),也可能在leader宕機(jī)時(shí)丟失數(shù)據(jù)。
-1:生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader收到數(shù)據(jù)后要等到ISR列表中的所有副本都同步數(shù)據(jù)完成后,才向生產(chǎn)者發(fā)送成功消息,如果一只收不到成功消息,則認(rèn)為發(fā)送數(shù)據(jù)失敗會(huì)自動(dòng)重發(fā)數(shù)據(jù)。
這種模式下可靠性很高,但是當(dāng)ISR列表中只剩下leader時(shí),當(dāng)leader宕機(jī)讓然有可能丟數(shù)據(jù)。
此時(shí)可以配置min.insync.replicas指定要求觀察ISR中至少要有指定數(shù)量的副本,默認(rèn)該值為1,需要改為大于等于2的值
這樣當(dāng)生產(chǎn)者發(fā)送數(shù)據(jù)給leader但是發(fā)現(xiàn)ISR中只有l(wèi)eader自己時(shí),會(huì)收到異常表明數(shù)據(jù)寫(xiě)入失敗,此時(shí)無(wú)法寫(xiě)入數(shù)據(jù),保證了數(shù)據(jù)絕對(duì)不丟。
雖然不丟但是可能會(huì)產(chǎn)生冗余數(shù)據(jù),例如生產(chǎn)者發(fā)送數(shù)據(jù)給leader,leader同步數(shù)據(jù)給ISR中的follower,同步到一半leader宕機(jī),此時(shí)選出新的leader,可能具有部分此次提交的數(shù)據(jù),而生產(chǎn)者收到失敗消息重發(fā)數(shù)據(jù),新的leader接受數(shù)據(jù)則數(shù)據(jù)重復(fù)了。
3、leader選舉
當(dāng)leader宕機(jī)時(shí)會(huì)選擇ISR中的一個(gè)follower成為新的leader,如果ISR中的所有副本都宕機(jī),怎么辦?
有如下配置可以解決此問(wèn)題:
unclean.leader.election.enable=false
策略1:必須等待ISR列表中的副本活過(guò)來(lái)才選擇其成為leader繼續(xù)工作。
unclean.leader.election.enable=true
策略2:選擇任何一個(gè)活過(guò)來(lái)的副本,成為leader繼續(xù)工作,此follower可能不在ISR中。
策略1,可靠性有保證,但是可用性低,只有最后掛了leader活過(guò)來(lái)kafka才能恢復(fù)。
策略2,可用性高,可靠性沒(méi)有保證,任何一個(gè)副本活過(guò)來(lái)就可以繼續(xù)工作,但是有可能存在數(shù)據(jù)不一致的情況。
4、kafka可靠性的保證
At most once:消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸。
At least once:消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸。
Exactly once:每條消息肯定會(huì)被傳輸一次且僅傳輸一次。
kafka最多保證At least once,可以保證不丟,但是可能會(huì)重復(fù),為了解決重復(fù)需要引入唯一標(biāo)識(shí)和去重機(jī)制,kafka提供了GUID實(shí)現(xiàn)了唯一標(biāo)識(shí),但是并沒(méi)有提供自帶的去重機(jī)制,需要開(kāi)發(fā)人員基于業(yè)務(wù)規(guī)則自己去重。
生產(chǎn)者產(chǎn)生第一個(gè)消息, 會(huì)在 segment 中記錄第一個(gè)偏移量, 一致追加, 如果打了閥值, 會(huì)存儲(chǔ)到磁盤(pán)上去。
KAFKA 的數(shù)據(jù)傳輸
KAFKA 消息發(fā)布流程
消息處理的優(yōu)勢(shì):
KAFKA的設(shè)計(jì)原理 ,無(wú)論做不做大數(shù)據(jù), 還是僅僅是配合 JAVA, 都需要了解 KAFKA
KAFKA 的 通訊協(xié)議
通訊過(guò)程
應(yīng)用層
與其它計(jì)算機(jī)進(jìn)行通訊的一個(gè)應(yīng)用,它是對(duì)應(yīng)應(yīng)用程序的通信服務(wù)的。
例如,一個(gè)沒(méi)有通信功能的字處理程序就不能執(zhí)行通信的代碼,從事字處理工作的程序員也不關(guān)心OSI的第7層。
但是,如果添加了一個(gè)傳輸文件的選項(xiàng),那么字處理器的程序就需要實(shí)現(xiàn)OSI的第7層。
表示層
這一層的主要功能是定義數(shù)據(jù)格式及加密。
例如,F(xiàn)TP允許你選擇以二進(jìn)制或ASCII格式傳輸。
如果選擇二進(jìn)制,那么發(fā)送方和接收方不改變文件的內(nèi)容。
如果選擇ASCII格式,發(fā)送方將把文本從發(fā)送方的字符集轉(zhuǎn)換成標(biāo)準(zhǔn)的ASCII后發(fā)送數(shù)據(jù)。
在接收方將標(biāo)準(zhǔn)的ASCII轉(zhuǎn)換成接收方計(jì)算機(jī)的字符集。示例:加密,ASCII等。
會(huì)話(huà)層
它定義了如何開(kāi)始、控制和結(jié)束一個(gè)會(huì)話(huà),包括對(duì)多個(gè)雙向消息的控制和管理,
以便在只完成連續(xù)消息的一部分時(shí)可以通知應(yīng)用,從而使表示層看到的數(shù)據(jù)是連續(xù)的,
在某些情況下,如果表示層收到了所有的數(shù)據(jù),則用數(shù)據(jù)代表表示層。
示例:RPC,SQL等。
傳輸層
這層的功能包括是否選擇差錯(cuò)恢復(fù)協(xié)議還是無(wú)差錯(cuò)恢復(fù)協(xié)議,及在同一主機(jī)上對(duì)不同應(yīng)用的數(shù)據(jù)流的輸入
進(jìn)行復(fù)用,還包括對(duì)收到的順序不對(duì)的數(shù)據(jù)包的重新排序功能。示例:TCP,UDP,SPX。
網(wǎng)絡(luò)層
這層對(duì)端到端的包傳輸進(jìn)行定義,它定義了能夠標(biāo)識(shí)所有結(jié)點(diǎn)的邏輯地址,還定義了路由實(shí)現(xiàn)的方式和學(xué)習(xí)的方式。
為了適應(yīng)最大傳輸單元長(zhǎng)度小于包長(zhǎng)度的傳輸介質(zhì),網(wǎng)絡(luò)層還定義了如何將一個(gè)包分解成更小的包的分段方法。
示例:IP,IPX等。
數(shù)據(jù)鏈路層
物理層
OSI的物理層規(guī)范是有關(guān)傳輸介質(zhì)的特性,這些規(guī)范通常也參考了其他組織制定的標(biāo)準(zhǔn)。
連接頭、幀、幀的使用、電流、編碼及光調(diào)制等都屬于各種物理層規(guī)范中的內(nèi)容。
物理層常用多個(gè)規(guī)范完成對(duì)所有細(xì)節(jié)的定義。示例:Rj45,802.3等。
KAFKA 集群的安裝部署:
1、下載kafka.tgz 架包
2、解壓
3、配置 zookepper,配置producer,配置consumer
4、啟動(dòng)服務(wù)
a 啟動(dòng)zooKEPPER ,
b 啟動(dòng)kafka
./bin/zkserver.sh start
./bin/kafka-server-start.sh /config/server.properties
創(chuàng)建 topic
./bin/kafka-topics.sh --create--zookepper hadoop1:2181,hadoop2:2181,hadoop3:2181
--replication -factor 1 --partition 1 -topic mytopic
配置消費(fèi)者信息
查看是不是有 kafka 的進(jìn)程, 使用 # ps 命令
------------ 日志文件, 實(shí)時(shí)的推送到 Kafka 里面, 做一個(gè) topic - from -beginning
把消費(fèi)者的端口啟動(dòng)了, 消費(fèi) test 的數(shù)據(jù)。
剛才的日志文件, 用 for循環(huán) 做了一個(gè) 50000個(gè)日志文件, 灌入到kafka 里面去, 然后, 就可以消費(fèi)了。
生產(chǎn)者 產(chǎn)生了數(shù)據(jù), 放入到 KafKA里面, 然后消費(fèi)者就可以消費(fèi)了。
生產(chǎn)者對(duì)應(yīng)的參數(shù)。 消費(fèi)者對(duì)應(yīng)的參數(shù)。
做一下回顧: KAFKA Message 不斷記錄,offset 偏移量, 到達(dá)閥值, flash到硬盤(pán)
P
----------------------------------------------------------------------------以下是原始筆記
Kafka
1、Kafka是什么
2、Kafka體系結(jié)構(gòu)
3、Kafka設(shè)計(jì)理念
4、Kafka通信協(xié)議
5、Kafka集群
6、Kafka相關(guān)操作:kafka的shell操作及java操作
7、kafka的producer和consumer開(kāi)發(fā)
Kafka產(chǎn)生的背景:
Kafka是分布式發(fā)布-訂閱消息系統(tǒng),它最初由LInkedin公司開(kāi)發(fā),使用scala語(yǔ)言編寫(xiě)之后成為Apache項(xiàng)目的一部分,kafka是一個(gè)分布式的,可劃分的,多訂閱者,冗余備份持久性的日志服務(wù),它主要用于處理活
躍的流式數(shù)據(jù)。
kafka可以起到兩個(gè)作用:
1、降低系統(tǒng)組網(wǎng)的復(fù)雜度
2、降低編程的復(fù)雜度,各個(gè)子系統(tǒng)不在是相互協(xié)商接口,各個(gè)子系統(tǒng)類(lèi)似插口插在插座上,kafka承載高速數(shù)據(jù)總線(xiàn)的作用。
kafka簡(jiǎn)介:
1、同時(shí)為發(fā)布和訂閱提供高吞吐量,kafka每秒可以生產(chǎn)為25萬(wàn)消息(50MB),每秒可以處理55萬(wàn)條數(shù)據(jù)(110MB)。
2、可以進(jìn)行持久化操作,將消息持久化到磁盤(pán),因此可用于批量消費(fèi),如ETL,以及實(shí)時(shí)應(yīng)用程序。通過(guò)將數(shù)據(jù)持久化到磁盤(pán)以及replication防止數(shù)據(jù)丟失。
3、分布式系統(tǒng),易于向外擴(kuò)展,所有的producer、broker、consumer都會(huì)有多個(gè),均為分布式的,無(wú)需停機(jī)即可擴(kuò)展機(jī)器。
4、消息被處理的狀態(tài)是在consumer端維護(hù),而不是在server端維護(hù),當(dāng)失敗時(shí)能自動(dòng)平衡。
5、支持onlin和offline的場(chǎng)景。
性能測(cè)試:
虛擬機(jī):CPU雙核、內(nèi)存:2GB、硬盤(pán):60GB
測(cè)試指標(biāo):
消息推積壓力測(cè)試:
單個(gè)kafka broker節(jié)點(diǎn)測(cè)試,啟動(dòng)一個(gè)kafka broker和producer,producer不斷向broker發(fā)送消息
直到broker堆積數(shù)據(jù)為18GB為停止producer,接著啟動(dòng)consumer,不斷從broker獲取數(shù)據(jù)
直到全部數(shù)據(jù)讀取完停止,最后檢查producer==consumer數(shù)據(jù),沒(méi)有出現(xiàn)卡死不響應(yīng)現(xiàn)象。
結(jié)論:數(shù)據(jù)大量堆積不會(huì)出現(xiàn)broker卡死或不影響現(xiàn)象。
生產(chǎn)者速率:
1萬(wàn)左右。
結(jié)論:性能上完全滿(mǎn)足要求,其性能主要由磁盤(pán)決定
消費(fèi)者速率
1萬(wàn)左右
結(jié)論:性能上完全滿(mǎn)足要求,其性能主要由磁盤(pán)決定。
Kafka的基本概念:
1、Topic:特指kafka的消息源的不同分類(lèi)
2、Partion: Topic物理上的分組,一個(gè)topic可以分為多個(gè)partion,每個(gè)partion是一個(gè)有序的隊(duì)列,partion中的每條消息都會(huì)被分配一個(gè)有序的id,也叫offset。
3、Message: 消息,是通信的基本單位,每個(gè)producer可以向一個(gè)topic發(fā)布一些消息。
4、Producers:消息和數(shù)據(jù)的生產(chǎn)者,向kafka的一個(gè)topic發(fā)布消息的過(guò)程叫做producers
5、Consumers:消息和數(shù)據(jù)的消費(fèi)者,訂閱Topic并處理其發(fā)布的消息的過(guò)程叫做consumers。
6、Broker:緩存代理,kafka集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱(chēng)為Broker.
kafka設(shè)計(jì)關(guān)注的重點(diǎn):
1、為生產(chǎn)者和消費(fèi)者提供一個(gè)通用的API
2、消息的持久化
3、高吞吐量,可以滿(mǎn)足百萬(wàn)級(jí)別的消息處理。
4、對(duì)分布式和高擴(kuò)展的支持。
kafka最基本的架構(gòu)是生產(chǎn)者發(fā)布一個(gè)消息到kafka的一個(gè)主題topic,這個(gè)主題topic即是由扮演kafkaServer角色的broker提供,消費(fèi)者訂閱這個(gè)主題,然后從中獲取信息。
kafka的兩大法寶:
1、提供文件的分段
2、提供文件索引
索引優(yōu)化:稀疏存儲(chǔ),每隔一定字節(jié)的數(shù)據(jù)建立一條索引
kafka消息隊(duì)列分類(lèi):
1、點(diǎn)對(duì)點(diǎn)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出消息,并且消費(fèi)消息。
注意:
消息被消費(fèi)后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。
queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
2、發(fā)布訂閱
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息,和點(diǎn)對(duì)點(diǎn)不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。
消息隊(duì)列MQ對(duì)比:
1、RabbitMQ:支持的協(xié)議多,非常重量級(jí)消息隊(duì)列,對(duì)路由(Routing),負(fù)載均衡(Load balance)或者數(shù)據(jù)持久化有很好的支持。
2、ZeroMQ:號(hào)稱(chēng)最快的消息隊(duì)列系統(tǒng),尤其針對(duì)大吞吐量的需求場(chǎng)景,擅長(zhǎng)的高級(jí)、復(fù)雜的隊(duì)列。但技術(shù)也復(fù)雜,并且只提供非持久性的隊(duì)列。
3、ActiveMQ:是Apache下的一個(gè)子項(xiàng)目,類(lèi)似于ZeroMQ,能夠以代理人和點(diǎn)對(duì)點(diǎn)的技術(shù)實(shí)現(xiàn)隊(duì)列。
4、Redis:是一個(gè)Key-Value的Nosql數(shù)據(jù)庫(kù),但也支持MQ功能,數(shù)據(jù)量小,性能優(yōu)于RabbitMQ,數(shù)據(jù)超過(guò)10k就慢得無(wú)法接受。
Kafka部署架構(gòu):
(Producer、Broker、Consumer、Zookeeper)
producer --(push)--> kafka(broker) <---(pull)---Consumer
|
|
|
|
Zookeeper
Kafka集群架構(gòu)
(Broker--Master、Slave <------Zookeeper)
Kafka的Producers
Producer將消息發(fā)布到指定的topic中,同時(shí)prodeucer也能決定將此消息歸屬于哪個(gè)partion,比如基于round-robin方式或者通過(guò)其它的一些算法等。
消息和數(shù)據(jù)的生產(chǎn)者,向kafka的一個(gè)topic發(fā)布消息的過(guò)程叫做producers
異步發(fā)送
批量發(fā)送可以很有效的提高發(fā)送效率,kafka producer的異步發(fā)送模式允許進(jìn)行批量發(fā)送,先將消息緩存在內(nèi)存中,然后一次請(qǐng)批量發(fā)送出去。
Kafka的Broker
Broker:緩存代理,為了減少磁盤(pán)寫(xiě)入的次數(shù),Broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣就減少了磁盤(pán)io調(diào)用的次數(shù)。
kafka的Consumers
注意:kafka的設(shè)計(jì)原理決定對(duì)于一個(gè)topic同一個(gè)group不能有多于partions個(gè)數(shù)的consumer同時(shí)消費(fèi)者,否則將意味著某些xonsumers將無(wú)法得到消息。
Kafka的broker無(wú)狀態(tài)機(jī)制
1、Broker沒(méi)有副本機(jī)制,一但broker宕機(jī),該broker的消息將都不可用
2、Broker不保存訂閱者的狀態(tài),由訂閱者自己保存
3、無(wú)狀態(tài)導(dǎo)致消息的刪除成為難道,kafka采用基于時(shí)間的sla,消息保存一定時(shí)間后會(huì)被刪除。
4、消息訂閱者可以rewind back到任意位置重新進(jìn)行消費(fèi),當(dāng)訂閱者故障時(shí),可以選擇最小的offset進(jìn)行重新讀取消費(fèi)消息。
kafka的consumer group
1、允許consumer group對(duì)一個(gè)topic進(jìn)行消費(fèi),不同的consumer group之間獨(dú)立訂閱
2、為了對(duì)減小一個(gè)consumer group中不同的consumer之間的分布式協(xié)調(diào)開(kāi)銷(xiāo),指定partion為最小的并行消費(fèi)單位,即一個(gè)group內(nèi)的consumer只能消費(fèi)不同的partion
Kafka的Topic/Log
一個(gè)topic可以認(rèn)為是一類(lèi)消息,每個(gè)topic將被分成多個(gè)partion分區(qū),每個(gè)partion在存儲(chǔ)層面是append log文件,任何發(fā)布到此partion的消息都會(huì)被追加到Log文件的尾部,每條消息在文件中的位置稱(chēng)為offset,也叫做偏移量,partion是以文件的形式存儲(chǔ)在文件系統(tǒng)中。
logs文件根據(jù)broker中的配置來(lái)保存一定時(shí)間后刪除來(lái)釋放磁盤(pán)空間。
Kafka的partion
1、kafka基于文件存儲(chǔ),通過(guò)分區(qū),可以將日志內(nèi)容分散到多個(gè)server上,來(lái)避免文件尺寸達(dá)到單機(jī)磁盤(pán)的上限,每個(gè)partion都會(huì)被當(dāng)前的server保存
2、可以將一個(gè)topic切分多任意多個(gè)partion,來(lái)消息保存消費(fèi)的效率
3、越多的partion意味著可以容納更多的consumer,有效提升并發(fā)消費(fèi)的能力。
kafka的Message
Message消息:是通信的基本單位,每個(gè)producer可以向一個(gè)topic發(fā)布一些消息。
Kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨(dú)立的,每個(gè)topic以可以為每個(gè)partion存儲(chǔ)一部分message。
partiion中的每條message包含了三個(gè)屬性:
1、offset 對(duì)應(yīng)類(lèi)型:long
2、MessageSize: 對(duì)應(yīng)類(lèi)型:int32
3、data: 是Message的具體內(nèi)容。
Kafka的Offset
每條消息在文件中的位置稱(chēng)為:offset,也叫偏移量,offset為一個(gè)long型數(shù)字,字是唯一標(biāo)記一條消息,kafka并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ)offset,因?yàn)樵趉afka中不允許對(duì)消息進(jìn)行隨即讀寫(xiě)。
partition中的每條消息message由offset來(lái)表示它在這個(gè)partition中的偏移量,這個(gè)offset不是這個(gè)message在partition數(shù)據(jù)文件中的實(shí)際存儲(chǔ)的位置,而是邏輯上一個(gè)值,它唯一確定了partition中的一條message,因此可以認(rèn)為offset是partition中message的id.
kafka的消息處理機(jī)制
1、發(fā)送到partitions中的消息將會(huì)按照它接收的順序追加到日志中
2、對(duì)于消費(fèi)者,它們消費(fèi)消息的順序和日志中消息順序一致
3、如果topic的replication factor為n,那么允許n-1個(gè)kafka實(shí)例失效
4、kafka對(duì)消息的重復(fù)、丟失、錯(cuò)誤以及順序沒(méi)有嚴(yán)格的要求。
5、kafka提供at-least-once delivery,當(dāng)consumers宕機(jī)后,有些消息可能會(huì)被重復(fù)delivery
6、因每個(gè)partition只會(huì)被consumergroup內(nèi)的一個(gè)consumer消費(fèi),所以kafka保證每個(gè)partition內(nèi)的消息會(huì)被順序訂閱。
7、kafka為每條消息計(jì)算CRC檢驗(yàn),用于錯(cuò)誤檢測(cè),CRC檢驗(yàn)不通過(guò)的消息會(huì)直接被丟棄掉
ACK校驗(yàn),當(dāng)消費(fèi)者消費(fèi)成功,返回ACK消息。
數(shù)據(jù)傳輸?shù)氖聞?wù)定義:
1、at most once: 最多一次,這個(gè)和jms中非持久化消息類(lèi)似,無(wú)論成敗,將不會(huì)重發(fā)。
2、at least once: 消息至少發(fā)送一次,如果消息未能接受成功,可能會(huì)進(jìn)行重發(fā),直到接受成功。
3、exactly once: 消息只會(huì)發(fā)送一次。
at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息,當(dāng)client保存offset之后,但是在消息處理過(guò)程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理,那么此后未處理的消息都不能被fetch到,這就是at most once。
at least once: 消費(fèi)者fetch消息,然后處理消息,然后打開(kāi)offset,如果消息處理成功之后,但是在保存offset階段zookeeper異常,導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是at least once,原因offset沒(méi)有即時(shí)的提交到zookeeper,zookeeper恢復(fù)正常還是之前的offset狀態(tài)。
exactly once: kafka中并沒(méi)有嚴(yán)格的去實(shí)現(xiàn)基于2階段提交事務(wù),我們認(rèn)為這種策略在kafka中沒(méi)有必要。
注意:
通常情況下:at least once是我們的首選,相比at most once,重復(fù)接受消息總比丟失數(shù)據(jù)要好。
kafka的儲(chǔ)存策略:
1、kafka以topic來(lái)進(jìn)行消息管理,每個(gè)topic包含多個(gè)partition,每個(gè)partition對(duì)應(yīng)一個(gè)邏輯log,有多個(gè)segment組成。
2、每個(gè)segment中存儲(chǔ)多條消息,消息id由其邏輯位置決定,從消息id可直接定位到消息的存儲(chǔ)位置,避免id到位置的額外映射。
3、broker收到發(fā)布消息往對(duì)應(yīng)的partion的最后一個(gè)segment上添加消息。
4、每個(gè)partition在內(nèi)存中對(duì)應(yīng)一個(gè)index,記錄每個(gè)segment中的第一條消息偏移。
5、發(fā)布者發(fā)送到某個(gè)topic的消息會(huì)被 均勻的分布到多個(gè)partition上(隨機(jī)或者根據(jù)用戶(hù)指定的回調(diào)函數(shù)進(jìn)行分布),broker收到發(fā)布消息往對(duì)應(yīng)的partition的最后一個(gè)segment上進(jìn)行添加該消息,當(dāng)某個(gè)segment上的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過(guò)閥值時(shí),segment上的消息會(huì)被flush到磁盤(pán),只有flush到磁盤(pán)上的消息訂閱者才能訂閱到,segment達(dá)到一定的大小后將不會(huì)再往該segment寫(xiě)數(shù)據(jù),broker會(huì)創(chuàng)建新的segment。
kafka的數(shù)據(jù)傳輸:
1、發(fā)布者每次可發(fā)布多條消息(將消息加到一個(gè)消息集合中發(fā)布),sub每次迭代一條消息。
2、不創(chuàng)建單獨(dú)的cache,使用系統(tǒng)的page cache。發(fā)布者順序發(fā)布,訂閱者通常比發(fā)布者滯后一點(diǎn)點(diǎn),直接使用Linux的page cache效果也比較后,同時(shí)減少了cache管理及垃圾收集和開(kāi)銷(xiāo)。
3、使用sendfile優(yōu)化網(wǎng)絡(luò)傳輸,減少一次內(nèi)存拷貝。
kafka的消息發(fā)送的流程:
1、由于kafka broker會(huì)持久化數(shù)據(jù),broker沒(méi)有內(nèi)存壓力,因此,consumer非常適合采取pull的方式消費(fèi)數(shù)據(jù)。
2、producer向kafka(push)推數(shù)據(jù)
3、consumer從kafka拉(pull)數(shù)據(jù)
消息處理的優(yōu)勢(shì):
1、簡(jiǎn)化kafka設(shè)計(jì)
2、consumer根據(jù)消費(fèi)能力自主控制消息拉取速度。
3、consumer根據(jù)自身情況自主選擇消費(fèi)模式,例如:批量、重復(fù)消費(fèi),從尾端開(kāi)始消費(fèi)等。
4、kafka集群接收到producer發(fā)過(guò)來(lái)的消息后,將其持久化到硬盤(pán),并保留消息指定時(shí)長(zhǎng),而不關(guān)注消息是否被消費(fèi)。
kafka設(shè)計(jì)原理實(shí)現(xiàn):
1、kafka以topic來(lái)進(jìn)行消息管理,發(fā)布者發(fā)到某個(gè)topic的消息會(huì)被均勻的分布到多個(gè)partition上
2、每個(gè)topic包含多個(gè)partition,每個(gè)partition對(duì)應(yīng)一個(gè)邏輯log,有多個(gè)segment組成
3、每個(gè)segment中存儲(chǔ)多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲(chǔ) 位置,避免id到位置的額外映射。
4、每個(gè)partition在內(nèi)存中有一個(gè)Index,記錄每個(gè)segment中的第一條消息的偏移量
5、當(dāng)某個(gè)segment上的消息數(shù)據(jù)達(dá)到一定閥值,會(huì)flush到磁盤(pán),進(jìn)行訂閱,broker此時(shí)會(huì)重新創(chuàng)建新的segment。
kafka的通訊協(xié)議:
kafka通訊的基本單位是request/response
基本結(jié)構(gòu):messagesize、requestmessage、responsemessage
通訊過(guò)程:
客戶(hù)端打開(kāi)與服務(wù)器的socket
往socket寫(xiě)入一個(gè)Int32的數(shù)字
服務(wù)端先讀取出一個(gè)int32的整數(shù)從而獲取這次requests的大小
然后讀取對(duì)應(yīng)字節(jié)數(shù)的數(shù)據(jù)從而得到requests的具體內(nèi)容
服務(wù)器端處理了請(qǐng)求后,也用同樣的方式來(lái)發(fā)送響應(yīng)。
kafka的通訊協(xié)議組件關(guān)系:
Request/Response是通訊層的結(jié)構(gòu),和網(wǎng)絡(luò)的7層模型對(duì)比的話(huà),它類(lèi)似于TCP層
Message、MessageSet定義的是業(yè)務(wù)層的結(jié)構(gòu),類(lèi)似于網(wǎng)絡(luò)7層模型中的HTTP層,Message、MessageSet只是Request、Response的payload中的一種數(shù)據(jù)結(jié)構(gòu)。
說(shuō)明:
kafka的通訊協(xié)議中不包含schema,格式也比較簡(jiǎn)單,這樣設(shè)計(jì)的好處是協(xié)議自身的overhead小,再加上把多條message放在一起做壓縮,提高壓縮比率,從而在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量會(huì)少一些。
kafka的分布式實(shí)現(xiàn):
1、一個(gè)topic的多個(gè)partition被分布在kafka集群中的多個(gè)server(kafka實(shí)例)上,每個(gè)server負(fù)責(zé)partition中消息的讀寫(xiě)操作。
2、此外kafka還可以配置partition需要備份的個(gè)數(shù)replicas,每個(gè)partition將會(huì)被備份到多臺(tái)機(jī)器上,以提高可用性。
3、基于replicated方案,那么就意味著需要對(duì)多備份從進(jìn)行調(diào)整。
4、每個(gè)partition都有一個(gè)server為leader,leader負(fù)責(zé)所有的讀寫(xiě)操作,如果leader失效,那么將會(huì)有其它的follower來(lái)接管,成為新的leader。
5、follower只是單調(diào)的和leader跟進(jìn),同步消息即可,由此可見(jiàn)作為leader的server承載了全部的請(qǐng)求壓力,因此從集群的整體考慮,有多少個(gè)partitions就意味著有多少個(gè)leader
6、kafka會(huì)將leader均衡的分散在每個(gè)實(shí)例上,來(lái)確保整體的性能穩(wěn)定。
kafka數(shù)據(jù)持久化:
1、發(fā)現(xiàn)線(xiàn)性的訪(fǎng)問(wèn)磁盤(pán),很多時(shí)候比隨機(jī)的內(nèi)存訪(fǎng)問(wèn)快得多
2、傳統(tǒng)的使用內(nèi)存做為磁盤(pán)緩存
3、kafka直接將數(shù)據(jù)寫(xiě)入到日志文件中
kafka安裝:
1、下載kafka.tgz包
2、解壓
3、配置zookeeper,配置producer,配置consumer
4、啟動(dòng)服務(wù)
a、啟動(dòng)zookeeper服務(wù),b、啟動(dòng)kafka
./bin/zkServer.sh start /stop /status
./bin/kafka-server-start.sh config/server.properties
創(chuàng)建topic:
./bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic mytopic
查看topic:
./bin/kafka-topics.sh --list --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181
查看topic詳細(xì)信息:
./bin/kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic
刪除topic
./bin/kafka-topics.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --delete --topic mytopic
創(chuàng)建生產(chǎn)者producer
./bin/kafka-console-producer.sh --broker--list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic mytopic
創(chuàng)建消費(fèi)者consumer
./bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic mytopic --from-beginning
生產(chǎn)者參數(shù)查看:
./bin/kafka-console-producer.sh
消費(fèi)者參數(shù)查看:
./bin/kafka-console-consumer.sh