亞馬遜云科技Amazon MSK是Amazon云平臺提供的托管Kafka服務(wù)。在系統(tǒng)升級或遷移時(shí),用戶常常需要將一個(gè)Amazon MSK集群中的數(shù)據(jù)導(dǎo)出(備份),然后在新集群或另一個(gè)集群中再將數(shù)據(jù)導(dǎo)入(還原)。通常,Kafka集群間的數(shù)據(jù)復(fù)制和同步多采用Kafka MirrorMaker,但是,在某些場景中,受環(huán)境限制,兩個(gè)于Kafka集群之間的網(wǎng)絡(luò)可能無法連通,或者兩個(gè)亞馬遜云科技賬號相互隔離,亦或是需要將Kafka的數(shù)據(jù)沉淀為文件存儲以備他用。此時(shí),基于Kafka Connect S3 Source/Sink Connector的方案會是一種較為合適的選擇,本文就將介紹一下這一方案的具體實(shí)現(xiàn)。
?數(shù)據(jù)的導(dǎo)出、導(dǎo)入、備份、還原通常都是一次性操作,為此搭建完備持久的基礎(chǔ)設(shè)施并無太大必要,省時(shí)省力,簡單便捷才是優(yōu)先的考量因素。為此,本文將提供一套開箱即用的解決方案,方案使用Docker搭建Kafka Connect,所有操作均配備自動化Shell腳本,用戶只需設(shè)置一些環(huán)境變量并執(zhí)行相應(yīng)腳本即可完成全部工作。這種基于Docker的單體模式可以應(yīng)對中小型規(guī)模的數(shù)據(jù)同步和遷移,如果要尋求穩(wěn)定、健壯的解決方案,可以考慮將Docker版本的Kafka Connect遷移到Kubernetes或Amazon MSK Connect,實(shí)現(xiàn)集群化部署。
?整體架構(gòu)
?首先介紹一下方案的整體架構(gòu)。導(dǎo)出/導(dǎo)入和備份/還原其實(shí)是兩種高度類似的場景,但為了描述清晰,我們還是分開討論。先看一下導(dǎo)出/導(dǎo)入的架構(gòu)示意圖:
?在這個(gè)架構(gòu)中,Source端的MSK是數(shù)據(jù)流的起點(diǎn),安裝了S3 Sink Connector的Kafka Connect會從Source端的MSK中提取指定Topic的數(shù)據(jù),然后以Json或Avro文件的形式存儲到S3上;同時(shí),另一個(gè)安裝了S3 Source Connector的Kafka Connect會從S3上讀取這些Json或Avro文件,然后寫入到Sink端MSK的對應(yīng)Topic中。如果Source端和Sink端的MSK集群不在同一個(gè)Region,可以在各自的Region分別完成導(dǎo)入和導(dǎo)出,然后在兩個(gè)Region之間使用S3的Cross-Rejion Replication進(jìn)行數(shù)據(jù)同步。
?該架構(gòu)只需進(jìn)行簡單的調(diào)整,即可用于MSK集群的備份/還原,如下圖所示:先將MSK集群的數(shù)據(jù)備份到S3上,待完成集群的升級、遷移或重建工作后,再從S3上將數(shù)據(jù)恢復(fù)到新建集群即可。
?預(yù)設(shè)條件
?本文聚焦于Kafka Connect的數(shù)據(jù)導(dǎo)出/導(dǎo)入和備份/還原操作,需要提前準(zhǔn)備:
?一臺基于Amazon Linux2的EC2實(shí)例(建議新建純凈實(shí)例),本文所有的實(shí)操腳本都將在該實(shí)例上執(zhí)行,該實(shí)例也是運(yùn)行Kafka Connect Docker Container的宿主機(jī)。
?兩個(gè)MSK集群,一個(gè)作為Source,一個(gè)作為Sink;如果只有一個(gè)MSK集群也可完成驗(yàn)證,該集群將既作Source又作Sink。
?為聚焦Kafka Connect S3 Source/Sink Connector的核心配置,預(yù)設(shè)MSK集群沒有開啟身份認(rèn)證(即認(rèn)證類型為Unauthenticated),數(shù)據(jù)傳輸方式為PLAINTEXT,以便簡化Kafka Connect的連接配置。
?網(wǎng)絡(luò)連通性上要求EC2實(shí)例能訪問S3、Source端MSK集群、Sink端MSK集群。如果在實(shí)際環(huán)境中無法同時(shí)連通Source端和Sink端,則可以在兩臺分屬于不同網(wǎng)絡(luò)的EC2上進(jìn)行操作,但它們必須都能訪問S3。如果是跨Region或賬號隔離,則另需配置S3 Cross-Region Replication或手動拷貝數(shù)據(jù)文件。
?全局配置
?由于實(shí)際操作將不可避免地依賴到具體的亞馬遜云科技賬號以及本地環(huán)境里的各項(xiàng)信息(如AKSK,服務(wù)地址,各類路徑,Topic名稱等),為了保證本文給出的操作腳本具有良好的可移植性,將所有與環(huán)境相關(guān)的信息抽離出來,以全局變量的形式在實(shí)操前集中配置。以下就是全局變量的配置腳本,讀者需要根據(jù)個(gè)人環(huán)境設(shè)定這些變量的取值:
?為了便于演示和解讀,本文將使用下面的全局配置,其中前6項(xiàng)配置與賬號和環(huán)境強(qiáng)相關(guān),仍需用戶自行修改,腳本中給出的僅為示意值,而后5項(xiàng)配置與MSK數(shù)據(jù)的導(dǎo)入導(dǎo)出息息相關(guān),不建議修改,因?yàn)楹罄m(xù)的解讀將基于這里設(shè)定的值展開,待完成驗(yàn)證后,您可再根據(jù)需要靈活修改后5項(xiàng)配置以完成實(shí)際的導(dǎo)入導(dǎo)出工作。
?回到操作流程,登錄準(zhǔn)備好的EC2實(shí)例,修改下面腳本中與賬號和環(huán)境相關(guān)的前6項(xiàng)配置,然后執(zhí)行修改后的腳本。此外,需要提醒注意的是:在后續(xù)操作中,部分腳本執(zhí)行后將不再返回,而是持續(xù)占用當(dāng)前窗口輸出日志或Kafka消息,因此需要新開命令行窗口,每次新開窗口都需要執(zhí)行一次這里的全局配置腳本。
?關(guān)于上述腳本中的后5項(xiàng)配置,有如下詳細(xì)說明:
?我們就以腳本中設(shè)定的值為例,解讀一下這5項(xiàng)配置聯(lián)合起來將要實(shí)現(xiàn)的功能,同時(shí)也是本文將演示的主要內(nèi)容:
?在Source端的MSK集群上存在兩個(gè)名為source-topic-1和source-topic-2的Topic,通過安裝有S3 Sink Connector的Kafka Connect(Docker容器)將兩個(gè)Topic的數(shù)據(jù)導(dǎo)出到S3的指定存儲桶中,然后再通過安裝有S3 Source Connector的Kafka Connect(Docker容器,可以和S3 Source Connector共存為一個(gè)Docker容器)將S3存儲桶中的數(shù)據(jù)寫入到Sink端的MSK集群上,其中原source-topic-1的數(shù)據(jù)將被寫入sink-topic-1,原source-topic-2的數(shù)據(jù)將被寫入sink-topic-2。
?特別地,如果是備份/還原場景,需要保持導(dǎo)出/導(dǎo)入的Topic名稱一致,此時(shí),可直接刪除S3 Source Connector中以transforms開頭的4項(xiàng)配置(將在下文中出現(xiàn)),或者將下面兩項(xiàng)改為:
?如果只有一個(gè)MSK集群,同樣可以完成本文的驗(yàn)證工作,只需將SOURCE_KAFKA_BOOTSTRAP_SEVERS和SINK_KAFKA_BOOTSTRAP_SEVERS同時(shí)設(shè)置為該集群即可,這樣,該集群既是Source端又是Sink端,由于配置中的Source Topics和Sink Topics并不同名,所以不會產(chǎn)生沖突。
?環(huán)境準(zhǔn)備
?安裝工具包
?在EC2上執(zhí)行以下腳本,安裝并配置jq,yq,docker,jdk,kafka-console-client五個(gè)必須的軟件包,可以根據(jù)自身EC2的情況酌情選擇安裝全部或部分軟件。建議使用純凈的EC2實(shí)例,完成全部的軟件安裝:
?創(chuàng)建S3存儲桶
?整個(gè)方案以S3作為數(shù)據(jù)轉(zhuǎn)儲媒介,為此需要在S3上創(chuàng)建一個(gè)存儲桶。Source端MSK集群的數(shù)據(jù)將會導(dǎo)出到該桶中并以Json文件形式保存,向Sink端MSK集群導(dǎo)入數(shù)據(jù)時(shí),讀取的也是存儲在該桶中的Json文件。
?在源MSK上創(chuàng)建Source Topics
?為了確保Topics數(shù)據(jù)能完整備份和還原,S3 Source Connector建議Sink Topics的分區(qū)數(shù)最好與Source Topics保持一致,如果讓MSK自動創(chuàng)建Topic,則很有可能會導(dǎo)致Source Topics和Sink Topics的分區(qū)數(shù)不對等,所以,選擇手動創(chuàng)建Source Topics和Sink Topics,并確保它們的分區(qū)數(shù)一致。以下腳本將創(chuàng)建source-topic-1和source-topic-2兩個(gè)Topic,各含9個(gè)分區(qū):
?在目標(biāo)MSK上創(chuàng)建Sink Topics
?原因同上,以下腳本將創(chuàng)建:sink-topic-1和sink-topic-2兩個(gè)Topic,各含9個(gè)分區(qū):
?制作Kafka Connect鏡像
?接下來是制作帶S3 Sink Connector和S3 Source Connector的Kafka Connect鏡像,鏡像和容器均以kafka-s3-syncer命名,以下是具體操作:
?配置并啟動Kafka Connect
?鏡像制作完成后,就可以啟動了Kafka Connect了。Kafka Connect有很多配置項(xiàng),需要提醒注意的是:在下面的配置中,使用的是Kafka Connect內(nèi)置的消息轉(zhuǎn)換器:JsonConverter,如果你的輸入/輸出格式是Avro或Parquet,則需要另行安裝對應(yīng)插件并設(shè)置正確的Converter Class。
?上述腳本執(zhí)行后,命令窗口將不再返回,而是會持續(xù)輸出容器日志,因此下一步操作需要新開一個(gè)命令行窗口。
?
?配置并啟動S3 Sink Connector
?在第5節(jié)的操作中,已經(jīng)將S3 Sink Connector安裝到了Kafka Connect的Docker鏡像中,但是還需要顯式地配置并啟動它。新開一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后執(zhí)行以下腳本:
?配置并啟動S3 Source Connector
?同上,在第5節(jié)的操作中,已經(jīng)將S3 Source Connector安裝到了Kafka Connect的Docker鏡像中,同樣需要顯式地配置并啟動它:
?至此,整個(gè)環(huán)境搭建完畢,一個(gè)以S3作為中轉(zhuǎn)媒介的MSK數(shù)據(jù)導(dǎo)出、導(dǎo)入、備份、還原鏈路已經(jīng)處于運(yùn)行狀態(tài)。
?
?測試
?現(xiàn)在,來驗(yàn)證一下整個(gè)鏈路是否能正常工作。首先,使用kafka-console-consumer.sh監(jiān)控source-topic-1和sink-topic-1兩個(gè)Topic,然后使用腳本向source-topic-1持續(xù)寫入數(shù)據(jù),如果在sink-topic-1看到了相同的數(shù)據(jù)輸出,就說明數(shù)據(jù)成功地從source-topic-1導(dǎo)出然后又導(dǎo)入到了sink-topic-1中,相應(yīng)的,在S3存儲桶中也能看到“沉淀”的數(shù)據(jù)文件。
?打開Source Topic
?新開一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控source-topic-1中的數(shù)據(jù):
?打開Sink Topic
?新開一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1):全局配置》,聲明全局變量,然后使用如下命令持續(xù)監(jiān)控sink-topic-1中的數(shù)據(jù):
?向Source Topic寫入數(shù)據(jù)
?新開一個(gè)命令行窗口,先執(zhí)行一遍《實(shí)操步驟(1:全局配置》,聲明全局變量,然后使用如下命令向source-topic-1中寫入數(shù)據(jù):
?現(xiàn)象與結(jié)論
?執(zhí)行上述寫入操作后,從監(jiān)控source-topic-1的命令行窗口中可以很快看到寫入的數(shù)據(jù),這說明Source端MSK已經(jīng)開始持續(xù)產(chǎn)生數(shù)據(jù)了,隨后(約1分鐘),即可在監(jiān)控sink-topic-1的命令行窗口中看到相同的輸出數(shù)據(jù),這說明目標(biāo)端的數(shù)據(jù)同步也已開始正常工作。此時(shí),打開S3的存儲桶會發(fā)現(xiàn)大量Json文件,這些Json是由S3 Sink Connector從source-topic-1導(dǎo)出并存放到S3上的,然后S3 Source Connector又讀取了這些Json并寫入到了sink-topic-1中,至此,整個(gè)方案的演示與驗(yàn)證工作全部結(jié)束。
??清理
?在驗(yàn)證過程中,可能需要多次調(diào)整并重試,每次重試最好恢復(fù)到初始狀態(tài),以下腳本會幫助清理所有已創(chuàng)建的資源:
?小結(jié)
?本方案主要定位于輕便易用,在S3 Sink Connector和S3 Source Connector中還有很多與性能、吞吐量相關(guān)的配置,例如:s3.part.size, flush.size, s3.poll.interval.ms, tasks.max等,可以在實(shí)際需要自行調(diào)整,此外,Kafka Connect也可以方便地遷移到Kubernetes或Amazon MSK Connect中以實(shí)現(xiàn)集群化部署。