中臺對外提供的數(shù)據(jù)應(yīng)該是完整的,源端數(shù)據(jù)的 Create、Update 和 Delete 都要能夠被捕獲,不能少也不能多,即數(shù)據(jù)需要有端到端一致性的能力(Exactly Once Semantic,EOS)。 當(dāng)然,EOS 并非在任何業(yè)務(wù)場景下都需要,但從平臺角度必須具備這種能力,并且允許用戶根據(jù)業(yè)務(wù)需求靈活開啟和關(guān)閉。 本文將主要闡述在構(gòu)建實(shí)時數(shù)據(jù)集成平臺時,對一些技術(shù)選型問題需要做哪些考量。 隨著企業(yè)應(yīng)用復(fù)雜性的上升和微服務(wù)架構(gòu)的流行,數(shù)據(jù)正變得越來越以應(yīng)用為中心。 服務(wù)之間僅在必要時以接口或者消息隊(duì)列方式進(jìn)行數(shù)據(jù)交互,從而避免了構(gòu)建單一數(shù)據(jù)庫集群來支撐不斷增長的業(yè)務(wù)需要。以應(yīng)用為中心的數(shù)據(jù)持久化架構(gòu),在帶來可伸縮性好處的同時,也給數(shù)據(jù)的融合計(jì)算帶來了障礙。 由于數(shù)據(jù)散落在不同的數(shù)據(jù)庫、消息隊(duì)列、文件系統(tǒng)中,計(jì)算平臺如果直接訪問這些數(shù)據(jù),會遇到可訪問性和數(shù)據(jù)傳輸延遲等問題。在一些場景下,計(jì)算平臺直接訪問應(yīng)用系統(tǒng)數(shù)據(jù)庫會對系統(tǒng)吞吐造成顯著影響,通常也是不被允許的。 因此,在進(jìn)行跨應(yīng)用的數(shù)據(jù)融合計(jì)算時,首先需要將數(shù)據(jù)從孤立的數(shù)據(jù)源中采集出來,匯集到可被計(jì)算平臺高效訪問的目的地,此過程被稱為 ETL,即數(shù)據(jù)的抽?。‥xtract)、轉(zhuǎn)換(Transform)和加載(Load)。 ETL 并不是什么新鮮事物。 該領(lǐng)域的傳統(tǒng)公司,例如 Informatica,早在 1993 年就已經(jīng)成立,并且提供了成熟的商業(yè)化解決方案。開源工具,例如 Kettle、DataX 等,在很多企業(yè)中也得到了廣泛的應(yīng)用。 傳統(tǒng)上,ETL 是通過批量作業(yè)完成的。即定期從數(shù)據(jù)源加載(增量)數(shù)據(jù),按照轉(zhuǎn)換邏輯進(jìn)行處理,并寫入目的地。根據(jù)業(yè)務(wù)需要和計(jì)算能力的不同,批量處理的延時通常從天到分鐘級不等。在一些應(yīng)用場景下,例如電子商務(wù)網(wǎng)站的商品索引更新,ETL 需要盡可能短的延遲,這就出現(xiàn)了實(shí)時 ETL 的需求。 在實(shí)時 ETL 中,數(shù)據(jù)源和數(shù)據(jù)目的地之間仿佛由管道連接在一起。數(shù)據(jù)從源端產(chǎn)生后,以極低的延遲被采集、加工,并寫入目的地,整個過程沒有明顯的處理批次邊界。 實(shí)時 ETL,又被稱為 Data Pipeline 模式。 阿里在 2018 年提出了“數(shù)據(jù)中臺”的概念。即數(shù)據(jù)被統(tǒng)一采集,規(guī)范數(shù)據(jù)語義和業(yè)務(wù)口徑形成企業(yè)基礎(chǔ)數(shù)據(jù)模型,提供統(tǒng)一的分析查詢和新業(yè)務(wù)的數(shù)據(jù)對接能力。 數(shù)據(jù)中臺并不是新的顛覆式技術(shù),而是一種企業(yè)數(shù)據(jù)資產(chǎn)管理和應(yīng)用方法學(xué),涵蓋了數(shù)據(jù)集成、數(shù)據(jù)質(zhì)量管理、元數(shù)據(jù) + 主數(shù)據(jù)管理、數(shù)倉建模、支持高并發(fā)訪問的數(shù)據(jù)服務(wù)接口層開發(fā)等內(nèi)容。 在數(shù)據(jù)中臺建設(shè)中,結(jié)合企業(yè)自身的業(yè)務(wù)需求特點(diǎn),架構(gòu)和功能可能各不相同,但其中一個最基本的需求是數(shù)據(jù)采集的實(shí)時性和完整性。數(shù)據(jù)從源端產(chǎn)生,到被采集到數(shù)據(jù)匯集層的時間要盡可能短,至少應(yīng)做到秒級延遲,這樣中臺的數(shù)據(jù)模型更新才可能做到近實(shí)時,構(gòu)建在中臺之上依賴實(shí)時數(shù)據(jù)流驅(qū)動的應(yīng)用(例如商品推薦、欺詐檢測等)才能夠滿足業(yè)務(wù)的需求。 以阿里雙十一為例,在極高的并發(fā)情況下,訂單產(chǎn)生到大屏統(tǒng)計(jì)數(shù)據(jù)更新延遲不能超過 5s,一般在 2s 內(nèi)。 中臺對外提供的數(shù)據(jù)應(yīng)該是完整的,源端數(shù)據(jù)的 Create、Update 和 Delete 都要能夠被捕獲,不能少也不能多,即數(shù)據(jù)需要有端到端一致性的能力(Exactly Once Semantic,EOS)。 當(dāng)然,EOS 并非在任何業(yè)務(wù)場景下都需要,但從平臺角度必須具備這種能力,并且允許用戶根據(jù)業(yè)務(wù)需求靈活開啟和關(guān)閉。 在構(gòu)建實(shí)時數(shù)據(jù)集成平臺時,就一些技術(shù)選型問題,建議做以下考量: 源數(shù)據(jù)變化捕獲是數(shù)據(jù)集成的起點(diǎn),獲取數(shù)據(jù)源變化主要有三種方式:?
基于日志的解析模式常用于各種類型的數(shù)據(jù)庫,例如 MySQL 的 Binlog、Oracle 的 Redo&Achieve Log、SQL Server Change Tracking & CDC 等。 不同數(shù)據(jù)庫日志解析的原理差別很大,以 MySQL Binlog 模式為例,解析程序本身是一個 Slave,能夠?qū)崟r收到 MySQL Master 的數(shù)據(jù)流推送,并解析還原成 DDL 和 DML 操作。而 SQL Server 的 CT 模式下,增量是通過定期查詢 Change Tracking 表實(shí)現(xiàn)的。 基于增量條件的查詢模式不依賴于源端開啟日志記錄,但對于數(shù)據(jù)源通常有額外的格式要求。例如,數(shù)據(jù)庫表或文檔對象需要有標(biāo)志更新時間的字段,這在一些業(yè)務(wù)系統(tǒng)中是無法滿足的。 數(shù)據(jù)源主動 Push 模式的常見形式為業(yè)務(wù)插碼,即應(yīng)用系統(tǒng)通過打點(diǎn)或者配置切面的方式,將數(shù)據(jù)變化封裝為事件,額外發(fā)送一份給數(shù)據(jù)集成平臺。這種方式一般需要對源端系統(tǒng)代碼進(jìn)行一定程度的修改。 通常而言,基于數(shù)據(jù)庫的日志進(jìn)行增量捕獲應(yīng)當(dāng)被優(yōu)先考慮。其具備以下幾個顯著優(yōu)點(diǎn):?
當(dāng)然,事物都具有兩面性。開啟數(shù)據(jù)庫日志通常會對源庫性能產(chǎn)生一定的影響,需要額外的存儲空間,甚至一些解析方法也會對源庫資源造成額外消耗。因此,實(shí)施過程中需要在 DBA 的配合下,根據(jù)數(shù)據(jù)庫特點(diǎn)和解析原理進(jìn)行 DB 部署規(guī)劃。 推薦使用數(shù)據(jù)庫的復(fù)制和災(zāi)備能力,在獨(dú)立服務(wù)器對從庫進(jìn)行日志解析。此外,當(dāng)數(shù)據(jù)庫產(chǎn)生批量更新時,會在短時間內(nèi)產(chǎn)生大量日志堆積,如果日志留存策略設(shè)置不當(dāng),容易出現(xiàn)數(shù)據(jù)丟失。這些都需要根據(jù)具體的業(yè)務(wù)數(shù)據(jù)增長特點(diǎn),在前期做好規(guī)劃,并在上線后根據(jù)業(yè)務(wù)變化定期進(jìn)行評估和調(diào)整。 數(shù)據(jù)源主動 push 模式下,由于事件發(fā)送和業(yè)務(wù)處理很難做到事務(wù)一致性,所以當(dāng)出現(xiàn)異常時,數(shù)據(jù)一致性就無從保證,比較適合對于數(shù)據(jù)一致性要求不高的場景,例如用戶行為分析。 無論采用何種數(shù)據(jù)變化捕獲技術(shù),程序必須在一個可靠的平臺運(yùn)行。該平臺需要解決分布式系統(tǒng)的一些共性問題,主要包括:水平擴(kuò)展、容錯、進(jìn)度管理等。 程序必須能夠以分布式 job 的形式在集群中運(yùn)行,從而允許在業(yè)務(wù)增長時通過增加運(yùn)行時節(jié)點(diǎn)的方式實(shí)現(xiàn)擴(kuò)展。 因?yàn)樵谝粋€規(guī)模化的企業(yè)中,通常要同時運(yùn)行成百上千的 job。隨著業(yè)務(wù)的增長,job 的數(shù)量以及 job 的負(fù)載還有可能持續(xù)增長。 分布式運(yùn)行環(huán)境的執(zhí)行節(jié)點(diǎn)可能因?yàn)檫^載、網(wǎng)絡(luò)連通性等原因無法正常工作。 當(dāng)節(jié)點(diǎn)出現(xiàn)問題時,運(yùn)行環(huán)境需要能夠及時監(jiān)測到,并將問題節(jié)點(diǎn)上的 job 分配給健康的節(jié)點(diǎn)繼續(xù)運(yùn)行。 job 需要記錄自身處理的進(jìn)度,避免重復(fù)處理數(shù)據(jù)。另外,job 會因?yàn)樯舷掠蜗到y(tǒng)的問題、網(wǎng)絡(luò)連通性、程序 bug 等各種原因異常中止,當(dāng) job 重啟后,必須能夠從上次記錄的正常進(jìn)度位置開始處理后繼的數(shù)據(jù)。 有許多優(yōu)秀的開源框架都可以滿足上述要求,包括 Kafka Connect、Spark、Flink 等。 Kafka Connect 是一個專注數(shù)據(jù)進(jìn)出 Kafka 的數(shù)據(jù)集成框架。Spark 和 Flink 則更為通用,既可以用于數(shù)據(jù)集成,也適用于更加復(fù)雜的應(yīng)用場景,例如機(jī)器學(xué)習(xí)的模型訓(xùn)練和流式計(jì)算。 就數(shù)據(jù)集成這一應(yīng)用場景而言,不同框架的概念是非常類似的。 首先,框架提供 Source Connector 接口封裝對數(shù)據(jù)源的訪問。應(yīng)用開發(fā)者基于這一接口開發(fā)適配特定數(shù)據(jù)源的 Connector,實(shí)現(xiàn)數(shù)據(jù)抽取邏輯和進(jìn)度(offset)更新邏輯。 其次,框架提供一個分布式的 Connector 運(yùn)行環(huán)境,處理任務(wù)的分發(fā)、容錯和進(jìn)度更新等問題。 不同之處在于,Kafka Connect 總是將數(shù)據(jù)抽取到 Kafka,而對于 Spark 和 Flink,Source Connector 是將數(shù)據(jù)抽取到內(nèi)存中構(gòu)建對象,寫入目的地是由程序邏輯定義的,包括但不限于消息隊(duì)列。 但無論采用何種框架,都建議首先將數(shù)據(jù)寫入一個匯集層,通常是 Kafka 這樣的消息隊(duì)列。 單就數(shù)據(jù)源采集而言,Kafka Connect 這樣專注于數(shù)據(jù)集成的框架是有一定優(yōu)勢的,這主要體現(xiàn)在兩方面: 首先是 Connector 的豐富程度,幾乎所有較為流行的數(shù)據(jù)庫、對象存儲、文件系統(tǒng)都有開源的 Connector 實(shí)現(xiàn)。 尤其在數(shù)據(jù)庫的 CDC 方面,有 Debezium 這樣優(yōu)秀的開源項(xiàng)目存在,降低了應(yīng)用的成本。 其次是開發(fā)的便捷性,專有框架的設(shè)計(jì)相較于通用框架更為簡潔,開發(fā)新的 Connector 門檻較低。Kafka Connect 的 runtime 實(shí)現(xiàn)也較為輕量,出現(xiàn)框架級別問題時 debug 也比較便捷。 盡管目前版本的 Kafka Connect 還不支持?jǐn)?shù)據(jù)采集后進(jìn)入 Kafka 的 EOS 保證,但通過對 runtime 的修改,利用 Kafka 事務(wù)消息也能夠?qū)崿F(xiàn)這一點(diǎn)。相信 Kafka Connect 未來的版本也會很快提供官方的支持。 當(dāng)各類數(shù)據(jù)從源端抽取后,首先應(yīng)當(dāng)被寫入一個數(shù)據(jù)匯集層,然后再進(jìn)行后繼的轉(zhuǎn)換處理,直至將最終結(jié)果寫入目的地。數(shù)據(jù)匯集層的作用主要有兩點(diǎn): 首先,數(shù)據(jù)匯集層將異構(gòu)的數(shù)據(jù)源數(shù)據(jù)存儲為統(tǒng)一的格式,并且為后繼的處理提供一致的訪問接口。這就將處理邏輯和數(shù)據(jù)源解耦開來,同時屏蔽了數(shù)據(jù)抽取過程中可能發(fā)生的異常對后繼作業(yè)的影響。 其次,數(shù)據(jù)匯集層獨(dú)立于數(shù)據(jù)源,可被多次訪問,亦可根據(jù)業(yè)務(wù)需要緩存全部或一定期限的原始數(shù)據(jù),這為轉(zhuǎn)換分析提供了更高的靈活度。當(dāng)業(yè)務(wù)需求發(fā)生變化時,無需重復(fù)讀取源端數(shù)據(jù),直接基于數(shù)據(jù)匯集層就可以開發(fā)新的模型和應(yīng)用。數(shù)據(jù)匯集層可基于任意支持海量 / 高可用的文件系統(tǒng)、數(shù)據(jù)倉庫或者消息隊(duì)列構(gòu)建,常見的方案包括 HDFS、HBase、Kafka 等。 針對實(shí)時 ETL 場景,推薦使用 Kafka 或類似具有海量數(shù)據(jù)持久化能力的消息隊(duì)列來做數(shù)據(jù)匯集層,這會為后繼的流式處理提供便捷。同時,利用 Kafka 的數(shù)據(jù)回收機(jī)制,可以根據(jù)業(yè)務(wù)需要自動保留一定時間或大小的原始數(shù)據(jù)。 數(shù)據(jù)轉(zhuǎn)換是一個業(yè)務(wù)性很強(qiáng)的處理步驟。 當(dāng)數(shù)據(jù)進(jìn)入?yún)R集層后,一般會用于兩個典型的后繼處理場景:數(shù)倉構(gòu)建和數(shù)據(jù)流服務(wù)。 數(shù)倉構(gòu)建包括模型定義和預(yù)計(jì)算兩部分。數(shù)據(jù)工程師根據(jù)業(yè)務(wù)分析需要,使用星型或雪花模型設(shè)計(jì)數(shù)據(jù)倉庫結(jié)構(gòu),利用數(shù)據(jù)倉庫中間件完成模型構(gòu)建和更新。 開源領(lǐng)域,Apache Kylin 是預(yù)聚合模式 OLAP 代表,支持從 HIVE、Kafka、HDFS 等數(shù)據(jù)源加載原始表數(shù)據(jù),并通過 Spark/MR 來完成 CUBE 構(gòu)建和更新。 Druid 則是另一類預(yù)聚合 OLAP 的代表。在 Druid 的表結(jié)構(gòu)模型中,分為時間列、維度列和指標(biāo)列,允許對任意指標(biāo)列進(jìn)行聚合計(jì)算而無需定義維度數(shù)量。Druid 在數(shù)據(jù)存儲時便可對數(shù)據(jù)進(jìn)行聚合操作,這使得其更新延遲可以做到很低。在這些方面,Baidu 開源的 Palo 和 Druid 有類似之處。 一個普遍的共識是,沒有一個 OLAP 引擎能同時在數(shù)據(jù)量,靈活性和性能這三個方面做到完美,用戶需要基于自己的需求進(jìn)行取舍和選型。預(yù)計(jì)算模式的 OLAP 引擎在查詢響應(yīng)時間上相較于 MPP 引擎(Impala、SparkSQL、Presto 等)有一定優(yōu)勢,但相對限制了靈活性。 如前文所述,源端采集的數(shù)據(jù)建議放入一個匯集層,優(yōu)選是類似 Kafka 這樣的消息隊(duì)列。包括 Kylin 和 Druid 在內(nèi)的數(shù)據(jù)倉庫可以直接以流式的方式消費(fèi)數(shù)據(jù)進(jìn)行更新。 一種常見的情形為:原始采集的數(shù)據(jù)格式、粒度不一定滿足數(shù)據(jù)倉庫中表結(jié)構(gòu)的需要,而數(shù)倉提供的配置靈活度可能又不足夠。這種情況下需要在進(jìn)入數(shù)倉前對數(shù)據(jù)做額外的處理。 常見的處理包括過濾、字段替換、嵌套結(jié)構(gòu)一拆多、維度填充等,以上皆為無狀態(tài)的轉(zhuǎn)換。有狀態(tài)的轉(zhuǎn)換,例如 SUM、COUNT 等,在此過程中較少被使用,因?yàn)閿?shù)倉本身就提供了這些聚合能力。 數(shù)據(jù)流服務(wù)的構(gòu)建則是基于流式計(jì)算引擎,對匯集層的數(shù)據(jù)進(jìn)一步加工計(jì)算,并將結(jié)果實(shí)時輸出給下游應(yīng)用系統(tǒng)。這涉及到流式計(jì)算引擎的選擇:Spark Streaming、Flink、還是 Kafka Streams? 關(guān)于三個引擎的對比,網(wǎng)上有很多資料,在此不再贅述。 選型過程中有幾點(diǎn)值得特別關(guān)注: Spark 對流的支持是 MicroBatch,提供的是亞秒級的延遲,相較于 Flink 和 Kafka Streams 在實(shí)時性上要差一些。 Spark 和 Flink 都是將作業(yè)提交到計(jì)算集群上運(yùn)行,需要搭建專屬的運(yùn)行環(huán)境。 Kafka Streams 的作業(yè)是以普通 Java 程序方式運(yùn)行,本質(zhì)上是一個調(diào)用 Kafka Streaming API 的 Kafka Consumer,可以方便地嵌入各種應(yīng)用。 但相應(yīng)的,用戶需要自己解決作業(yè)程序在不同服務(wù)器上的分發(fā)問題,例如通過 K8s 集群方案進(jìn)行應(yīng)用的容器化部署。如果使用 KSQL,還需要部署 KSQL 的集群。 三者都提供 Streaming SQL,但 Flink 的 SQL 支持要更為強(qiáng)大些,可以運(yùn)行更加復(fù)雜的分組聚合操作。 Flink 對于數(shù)據(jù)進(jìn)出計(jì)算集群提供了框架級別的支持,這是通過結(jié)合 CheckPoint 機(jī)制和 Sink Connector 接口封裝的二階段提交協(xié)議實(shí)現(xiàn)的。 Kafka Streams 利用 Kafka 事務(wù)性消息,可以實(shí)現(xiàn)“消費(fèi) - 計(jì)算 - 寫入 Kafka“的 EOS,但當(dāng)結(jié)果需要輸出到 Kafka 以外的目的地時,還需要利用 Kafka Connect 的 Sink Connector。 遺憾的是,Kafka Connect 不提供 Kafka 到其它類型 Sink 的 EOS 保證,需要用戶自己實(shí)現(xiàn)。 Spark Streaming 與 Kafka Streams 類似,在讀取和計(jì)算過程中可以保證 EOS,但將結(jié)果輸出到外部時,依然需要額外做一些工作來確保數(shù)據(jù)一致性。常見的方式包括:利用數(shù)據(jù)庫的事務(wù)寫入機(jī)制將 Offset 持久化到外部、利用主鍵保證冪等寫入、參考二階段提交協(xié)議做分布式事務(wù)等。 本文簡要討論了一些構(gòu)建面向?qū)崟r數(shù)據(jù)的集成平臺在技術(shù)選型方面的考量點(diǎn)。 數(shù)據(jù)源變化捕獲是數(shù)據(jù)集成的起點(diǎn),結(jié)合日志的解析、增量條件查詢模式和數(shù)據(jù)源主動 Push 模式,最終構(gòu)建出一個數(shù)據(jù)匯集層。在這個階段,推薦考慮 Kafka Connect 這類面向數(shù)據(jù)集成的專有框架,可以有效縮短研發(fā)周期和成本。 數(shù)據(jù)匯集層建議構(gòu)建在消息隊(duì)列之上,為后繼的加工處理提供便利。如果需要全量持久化長期保存,建議結(jié)合使用消息隊(duì)列和分布式文件系統(tǒng)分別做實(shí)時數(shù)據(jù)和全量數(shù)據(jù)的存儲。 流式處理能力是實(shí)時數(shù)據(jù)集成平臺必要的組件。結(jié)合企業(yè)技術(shù)棧特點(diǎn),選用包括 Flink、Spark Streaming、Kafka Streams 等流行的引擎在多數(shù)情況下都能夠滿足要求。 端到端數(shù)據(jù)的 EOS 是數(shù)據(jù)集成中的一個難題,需要用戶根據(jù)業(yè)務(wù)實(shí)際需求、數(shù)據(jù)本身的特性、目的地特點(diǎn) case by case 去解決。 |
|