日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

如何構(gòu)建批流一體數(shù)據(jù)融合平臺的一致性語義保證?

 昵稱66099232 2019-10-10

本文根據(jù)陳肅老師在 Apache Kafka x Flink Meetup 深圳站的分享整理而成,文章首先將從數(shù)據(jù)融合角度,談一下 DataPipeline 對批流一體架構(gòu)的看法,以及如何設計和使用一個基礎框架。其次,數(shù)據(jù)的一致性是進行數(shù)據(jù)融合時最基礎的問題。如果數(shù)據(jù)無法實現(xiàn)一致,即使同步再快,支持的功能再豐富,都沒有意義。

另外,DataPipeline 目前使用的基礎框架為 Kafka Connect。為實現(xiàn)一致性的語義保證,我們做了一些額外工作,希望對大家有一定的參考意義。

最后,會提一些我們在應用 Kafka Connect 框架時,遇到的一些現(xiàn)實的工程問題,以及應對方法。盡管大家的場景、環(huán)境和數(shù)據(jù)量級不同,但也有可能會遇到這些問題。希望對大家的工作有所幫助。

一、批流一體架構(gòu)

批和流是數(shù)據(jù)融合的兩種應用形態(tài)

下圖來自 Flink 官網(wǎng)。傳統(tǒng)的數(shù)據(jù)融合通常基于批模式。在批的模式下,我們會通過一些周期性運行的 ETL JOB,將數(shù)據(jù)從關系型數(shù)據(jù)庫、文件存儲向下游的目標數(shù)據(jù)庫進行同步,中間可能有各種類型的轉(zhuǎn)換。

另一種是 Data Pipeline 模式。與批模式相比相比, 其最核心的區(qū)別是將批量變?yōu)閷崟r:輸入的數(shù)據(jù)不再是周期性的去獲取,而是源源不斷的來自于數(shù)據(jù)庫的日志、消息隊列的消息。進而通過一個實時計算引擎,進行各種聚合運算,產(chǎn)生輸出結(jié)果,并且寫入下游。

現(xiàn)代的一些處理框架,包括 Flink、Kafka Streams、Spark,或多或少都能夠支持批和流兩種概念。只不過像 Kafka,其原生就是為流而生,所以如果基于 Kafka Connect 做批流一體,你可能需要對批量的數(shù)據(jù)處理做一些額外工作,這是我今天重點要介紹的。

數(shù)據(jù)融合的基本問題

如果問題簡化到你只有一張表,可能是一張 MySQL 的表,里面只有幾百萬行數(shù)據(jù),你可能想將其同步到一張 Hive 表中?;谶@種情況,大部分問題都不會遇到。因為結(jié)構(gòu)是確定的,數(shù)據(jù)量很小,且沒有所謂的并行化問題。

但在一個實際的企業(yè)場景下,如果做一個數(shù)據(jù)融合系統(tǒng),就不可避免要面臨幾方面的挑戰(zhàn):

第一,“動態(tài)性”

數(shù)據(jù)源會不斷地發(fā)生變化,主要歸因于:表結(jié)構(gòu)的變化,表的增減。針對這些情況,你需要有一些相應的策略進行處理。

第二,“可伸縮性”

任何一個分布式系統(tǒng),必須要提供可伸縮性。因為你不是只同步一張表,通常會有大量數(shù)據(jù)同步任務在進行著。如何在一個集群或多個集群中進行統(tǒng)一的調(diào)度,保證任務并行執(zhí)行的效率,這是一個要解決的基本問題。

第三,“容錯性”

在任何環(huán)境里你都不能假定服務器是永遠在正常運行的,網(wǎng)絡、磁盤、內(nèi)存都有可能發(fā)生故障。這種情況下一個 Job 可能會失敗,之后如何進行恢復?狀態(tài)能否延續(xù)?是否會產(chǎn)生數(shù)據(jù)的丟失和重復?這都是要考慮的問題。

第四,“異構(gòu)性”

當我們做一個數(shù)據(jù)融合項目時,由于源和目的地是不一樣的,比如,源是 MySQL,目的地是 Oracle,可能它們對于一個字段類型定義的標準是有差別的。在同步時,如果忽略這些差異,就會造成一系列的問題。

第五,“一致性”

一致性是數(shù)據(jù)融合中最基本的問題,即使不考慮數(shù)據(jù)同步的速度,也要保證數(shù)據(jù)一致。數(shù)據(jù)一致性的底線為:數(shù)據(jù)先不丟,如果丟了一部分,通常會導致業(yè)務無法使用;在此基礎上更好的情況是:源和目的地的數(shù)據(jù)要完全一致,即所謂的端到端一致性,如何做到呢?

Lambda 架構(gòu)是批流一體化的必然要求

目前在做這樣的平臺時,業(yè)界比較公認的有兩種架構(gòu):一種是 Lambda 架構(gòu),Lambda 架構(gòu)的核心是按需使用批量和流式的處理框架,分別針對批式和流式數(shù)據(jù)提供相應的處理邏輯。最終通過一個服務層進行對外服務的輸出。

為什么我們認為 Lambda 架構(gòu)是批流一體化的必然要求?這好像看起來是矛盾的(與之相對,還有一種架構(gòu)叫 Kappa 架構(gòu),即用一個流式處理引擎解決所有問題)。

實際上,這在很大程度來自于現(xiàn)實中用戶的需求。DataPipeline 在剛剛成立時只有一種模式,只支持實時流同步,在我們看來這是未來的一種趨勢。

但后來發(fā)現(xiàn),很多客戶實際上有批量同步的需求。比如,銀行在每天晚上可能會有一些月結(jié)、日結(jié),證券公司也有類似的結(jié)算服務。基于一些歷史原因,或出于對性能、數(shù)據(jù)庫配置的考慮,可能有的數(shù)據(jù)庫本身不能開 change log。所以實際上并不是所有情況下都能從源端獲取實時的流數(shù)據(jù)。

考慮到上述問題,我們認為一個產(chǎn)品在支撐數(shù)據(jù)融合過程中,必須能同時支撐批量和流式兩種處理模式,且在產(chǎn)品里面出于性能和穩(wěn)定性考慮提供不同的處理策略,這才是一個相對來說比較合理的基礎架構(gòu)。

數(shù)據(jù)融合的 Ad-Hoc 模式

具體到做這件事,還可以有兩種基礎的應用模式。假如我需要將數(shù)據(jù)從 MySQL 同步到 Hive,可以直接建立一個 ETL 的 JOB(例如基于 Flink),其中封裝所有的處理邏輯,包括從源端讀取數(shù)據(jù),然后進行變換寫入目的地。在將代碼編譯好以后,就可以放到 Flink 集群上運行,得到想要的結(jié)果。這個集群環(huán)境可以提供所需要的基礎能力,剛才提到的包括分布式,容錯等。

數(shù)據(jù)融合的 MQ 模式

另一種模式是 ETL JOB 本身輸入輸出實際上都是面對消息隊列的,實際上這是現(xiàn)在最常使用的一種模式。在這種模式下,需要通過一些獨立的數(shù)據(jù)源和目的地連接器,來完成數(shù)據(jù)到消息隊列的輸入和輸出。ETL JOB 可以用多種框架實現(xiàn),包括 Flink、Kafka Streams 等,ETL JOB 只和消息隊列發(fā)生數(shù)據(jù)交換。

DP 選擇 MQ 模式的理由

DataPipeline 選擇 MQ 模式,主要有幾點考慮:

第一,在我們產(chǎn)品應用中有一個非常常見的場景:要做數(shù)據(jù)的一對多分發(fā)。數(shù)據(jù)要進行一次讀取,然后分發(fā)到各種不同的目的地,這是一個非常適合消息隊列使用的分發(fā)模型。

第二,有時會對一次讀取的數(shù)據(jù)加不同的處理邏輯,我們希望這種處理不要重新對源端產(chǎn)生一次讀取。所以在多數(shù)情況下,都需將數(shù)據(jù)先讀到消息隊列,然后再配置相應的處理邏輯。

第三,Kafka Connect 就是基于 MQ 模式的,它有大量的開源連接器。基于 Kafka Connect 框架,我們可以重用這些連接器,節(jié)省研發(fā)的投入。

第四,當你把數(shù)據(jù)抽取跟寫入目的地,從處理邏輯中獨立出來之后,便可以提供更強大的集成能力。因為你可以在消息隊列上集成更多的處理邏輯,而無需考慮重新寫整個 Job。

相應而言,如果你選擇將 MQ 作為所有 JOB 的傳輸通道,就必須要克服幾個缺點:

第一,所有數(shù)據(jù)的吞吐都經(jīng)過 MQ,所以 MQ 會成為一個吞吐瓶頸。

第二,因為是一個完全的流式架構(gòu),所以針對批量同步,你需要引入一些邊界消息來實現(xiàn)一些批量控制。

第三,Kafka 是一個有持久化能力的消息隊列,這意味著數(shù)據(jù)留存是有極限的。比如,你將源端的讀到 Kafka Topic 里面,Topic 不會無限的大,有可能會造成數(shù)據(jù)容量超限,導致一些數(shù)據(jù)丟失。

第四,當批量同步在中間因為某種原因被打斷,無法做續(xù)傳時,你需要進行重傳。在重傳過程中,首先要將數(shù)據(jù)進行清理,如果基于消息隊列模式,清理過程就會帶來額外的工作。你會面臨兩個困境:要么清空原有的消息隊列,要么你創(chuàng)造新的消息隊列。這肯定不如像直接使用一些批量同步框架那樣來的直接。

二、一致性語義保證

用戶需求

先簡單介紹一下用戶對于數(shù)據(jù)同步方面的一些基本要求:

第一種需求,批量同步需要以一種事務性的方式完成同步

無論是同步一整塊的歷史數(shù)據(jù),還是同步某一天的增量,該部分數(shù)據(jù)到目的地,必須是以事務性的方式出現(xiàn)的。而不是在同步一半時,數(shù)據(jù)就已經(jīng)在目的地出現(xiàn)了,這可能會影響下游的一些計算邏輯。

第二種需求,流式數(shù)據(jù)盡可能快的完成同步

大家都希望越快越好,但相應的,同步的越快,吞吐量有可能因為你的參數(shù)設置出現(xiàn)相應的下降,這可能需要有一個權衡。

第三種需求,批量和流式可能共存于一個 JOB

作為一個數(shù)據(jù)融合產(chǎn)品,當用戶在使用DataPipeline時,通常需要將存量數(shù)據(jù)同步完,后面緊接著去接增量。然后存量與增量之間需要進行一個無縫切換,中間的數(shù)據(jù)不要丟、也不要多。

第四種需求,按需靈活選擇一致性語義保證

DataPipeline 作為一個產(chǎn)品,在客戶的環(huán)境中,我們無法對客戶數(shù)據(jù)本身的特性提出強制要求。我們不能要求客戶數(shù)據(jù)一定要有主鍵或者有唯一性的索引。所以在不同場景下,對于一致性語義保證,用戶的要求也不一樣的:

比如在有主鍵的場景下,一般我們做到至少有一次就夠了,因為在下游如果對方也是一個類似于關系型數(shù)據(jù)庫這樣的目的地,其本身就有去重能力,不需要在過程中間做一個強一致的保證。但是,如果其本身沒有主鍵,或者其下游是一個文件系統(tǒng),如果不在過程中間做額外的一致性保證,就有可能在目的地產(chǎn)生多余的數(shù)據(jù),這部分數(shù)據(jù)對于下游可能會造成非常嚴重的影響。

數(shù)據(jù)一致性的鏈路視角

如果要解決端到端的數(shù)據(jù)一致性,我們要處理好幾個基本環(huán)節(jié):

第一,在源端做一個一致性抽取

一致性抽取是什么含義?即當數(shù)據(jù)從通過數(shù)據(jù)連接器寫入到 MQ 時,和與其對應的 offset 必須是以事務方式進入 MQ 的。

第二,一致性處理

如果大家用過 Flink,F(xiàn)link 提供了一個端到端一致性處理的能力,它是內(nèi)部通過 checkpoint 機制,并結(jié)合 Sink 端的二階段提交協(xié)議,實現(xiàn)從數(shù)據(jù)讀取處理到寫入的一個端到端事務一致性。其它框架,例如 Spark Streaming 和 Kafka Streams 也有各自的機制來實現(xiàn)一致性處理。網(wǎng)站優(yōu)化

第三,一致性寫入

在 MQ 模式下,一致性寫入,即 consumer offset 跟實際的數(shù)據(jù)寫入目的時,必須是同時持久化的,要么全都成功,要么全部失敗。

第四,一致性銜接

在 DataPipeline 的產(chǎn)品應用中,歷史數(shù)據(jù)與實時數(shù)據(jù)的傳輸有時需要在一個任務中共同完成。所以產(chǎn)品本身需要有這種一致性銜接的能力,即歷史數(shù)據(jù)和流式數(shù)據(jù),必須能夠在一個任務中,由程序自動完成它們之間的切換。

Kafka Connect 的一致性保證

Kafka Connect 如何保證數(shù)據(jù)同步的一致性?就目前版本,Kafka Connect 只能支持端到端的 at least once,核心原因在于,在 Kafka Connect 里面,其 offset 的持久化與數(shù)據(jù)發(fā)送本身是異步完成的。這在很大程度上是為了提高其吞吐量考慮,但相應產(chǎn)生的問題是,如果使用 Kafka Connect,框架本身只能為你提供 at least once 的語義保證。

在該模式下,如果沒有通過主鍵或下游應用進行額外地去重,同步過程當中的數(shù)據(jù)會在極端情況下出現(xiàn)重復,比如源端發(fā)送出一批數(shù)據(jù)已經(jīng)成功,但 offset 持久化失敗了,這樣在任務恢復之后,之前已經(jīng)發(fā)送成功的數(shù)據(jù)會再次重新發(fā)送一批,而下游對這種現(xiàn)象完全是不知情的。目的端也是如此,因為 consumer 的 offset 也是異步持久化,就會到導致有可能數(shù)據(jù)已經(jīng)持久化到 Sink,但實際上 consumer offset 還沒有推進。這是我們在應用原生的 Kafka Connect 框架里遇到最大的兩個問題。

三、DP 的解決之道

二階段提交協(xié)議

DataPipeline 如何解決上述問題?首先,需要用協(xié)議的方式保證每一步都做成事務。一旦做成事務,由于每個環(huán)節(jié)都是解耦的,其最終數(shù)據(jù)就可以保證一致性。下圖為二階段提交協(xié)議的最基礎版本,接下來為大家簡單介紹一下。

首先,在二階段提交協(xié)議中,對于分布式事務的參與方,在 DataPipeline 的場景下為數(shù)據(jù)寫入與 offset 寫入,這是兩個獨立組件。兩者之間的寫入操作由 Coordinator 進行協(xié)調(diào)。第一步是一個 prepare 階段,每一個參與方會將數(shù)據(jù)寫入到自己的目的地,具體持久化的位置取決于具體應用的實現(xiàn)。

第二步,當 prepare 階段完成之后,Coordinator 會向所有參與者發(fā)出 commit 指令,所有參與者在完成 commit 之后,會發(fā)出一個 ack,Coordinator 收到 ack 之后,事務就完成了。如果出現(xiàn)失敗,再進行相應的回滾操作。其實在分布式數(shù)據(jù)庫的設計領域中,單純應用一個二階段提交協(xié)議會出現(xiàn)非常多的問題,例如 Coordinator 本身如果不是高可用的,在過程當中就有可能出現(xiàn)事務不一致的問題。

所以應用二階段提交協(xié)議,最核心的問題是如何保證 Coordinator 高可用。所幸在大家耳熟能詳?shù)母鞣N框架里,包括 Kafka 和 Flink,都能夠通過分布式一致協(xié)議實現(xiàn) Coordinator 高可用,這也是為什么我們能夠使用二階段提交來保證事務性。

Kafka 事務消息原理

關于 Kafka 事務消息原理,網(wǎng)上有很多資料,在此簡單說一下能夠達到的效果。Kafka 通過二階段提交協(xié)議,最終實現(xiàn)了兩個最核心的功能。

第一,一致性抽取

上文提到數(shù)據(jù)要被發(fā)送進 Kafka,同時 offset 要被持久化到 Kafka,這是對兩個不同 Topic 的寫入。通過利用 Kafka 事務性消息,我們能夠保證 offset 的寫入和數(shù)據(jù)的發(fā)送是一個事務。如果 offset 沒有持久化成功,下游是看不到這批數(shù)據(jù)的,這批數(shù)據(jù)實際上最終會被丟棄掉。

所以對于源端的發(fā)送,我們對 Kafka Connect 的 Source Worker 做了一些改造,讓其能夠提供兩種模式,如果用戶的數(shù)據(jù)本身是具備主鍵去重能力的,就可以繼續(xù)使用 Kafka Connect 原生的模式。

如果用戶需要強一致時,首先要開啟一個源端的事務發(fā)送功能,這就實現(xiàn)了源端的一致性抽取。其可以保證數(shù)據(jù)進 Kafka 一端不會出現(xiàn)數(shù)據(jù)重復。這里有一個限制,即一旦要開啟一致性抽取,根據(jù) Kafka 必須要將 ack 設置成 all,這意味著一批數(shù)據(jù)有多少個副本,其必須能夠在所有的副本所在的 broker 都已經(jīng)應答的情況下,才可以開始下一批數(shù)據(jù)的寫入。盡管會造成一些性能上的損失,但為了實現(xiàn)強一致,你必須要接受這一事實。

第二,一致性處理

事務性消息最早就是為 Kafka Streams 設計和準備的??梢詫懸欢?Kafka Streams 應用,從 Kafka 里讀取數(shù)據(jù),然后完成轉(zhuǎn)化邏輯,進而將結(jié)果再輸出回 Kafka。Sink 端再從 Kafka 中消費數(shù)據(jù),寫入目的地。

數(shù)據(jù)一致性寫入

之前簡要談了一下二階段提交協(xié)議的原理,DataPipeline 實現(xiàn)的方式不算很深奧,基本是業(yè)界的一種統(tǒng)一方式。其中最核心的點是,我們將 consumer offset 管理從 Kafka Connect 框架中獨立出來,實現(xiàn)事務一致性提交。另外,在 Sink 端封裝了一個類似于 Flink 的 TwoPhaseCommitSinkFunction 方式,其定義了 Sink 若要實現(xiàn)一個二階段提交所必須要實現(xiàn)的一些功能。

DataPipeline 將 Sink Connector 分為兩類,一類是 Connector 本身具備了事務能力,比如絕大部分的關系型數(shù)據(jù)庫,只需將 offset 跟數(shù)據(jù)同時持久化到目的地即可。額外的可能需要有一張 offset 表來記錄提交的 offset。還有一類 Sink 不具備事務性能力,類似像 FTP、OSS 這些對象存儲,我們需要去實現(xiàn)一個二階段提交協(xié)議,最終才能保證 Sink 端的數(shù)據(jù)能夠達到一致性寫入。

數(shù)據(jù)一致性銜接

關于批量數(shù)據(jù)與實時數(shù)據(jù)如何銜接的問題,主要有兩個關鍵點:

第一,當開始進行一個批量數(shù)據(jù)同步時,以關系型數(shù)據(jù)庫為例,你應該拿到當時一個整體數(shù)據(jù)的 Snapshot,并在一個事務中同時記錄當時對應的日志起始值。以 MySQL 為例,當要獲取一個 Binlog 起始偏移量時,需要開啟一個 START TRANSACTION WITH CONSISTENT SNAPSHOT,這樣才能保證完成全量之后,后期的讀取增量日志同步不會產(chǎn)生重復數(shù)據(jù)。

第二,如果采用增量同步模式,則必須根據(jù)實際的數(shù)據(jù)業(yè)務領域,采用一種比較靈活的增量表達式,才能避免讀到寫到一半的數(shù)據(jù)。比如在你的數(shù)據(jù)中,其 ID 是一個完全自增,沒有任何重復的可能,此時只需每次單純的大于上一次同步的最后一條記錄即可。

但如果是一個時間戳,無論精度多高,都有可能在數(shù)據(jù)庫產(chǎn)生相同的時間戳,所以安全的做法是每次迭代時,取比當前時間稍微少一點,保證留出一個安全時間,比如五秒甚至一分鐘,這樣你永遠不會讀到一些時間戳可能會產(chǎn)生沖突的這部分數(shù)據(jù),避免遺漏數(shù)據(jù)。這是一個小技巧,但如果沒有注意,在使用過程中就會產(chǎn)生各種各樣的問題。

還有一點是上面提及的,如何能夠在一個流式框架實現(xiàn)批量同步的一致性,對于所有的流式框架,需要引入一些邊界條件來標志著一次批量同步的開始和結(jié)束。DataPipeline 在每次批量發(fā)送開始和結(jié)束后,會引入一些控制量信號,然后在 Sink端進行相應處理。同樣為了保證事務一致性,在 Sink 端處理這種批量同步時,依然要做一些類似于二階段提交這樣的方式,避免在一些極端情況下出現(xiàn)數(shù)據(jù)不一致的問題。

四、問題和思考

上文介紹的是 DataPipeline 如何基于 Kafka Connect 做事務同步一致性的方案。

DataPipeline 在使用 Kafka Connect 過程中遇到過一些問題,目前大部分已經(jīng)有一些解決方案,還有少量問題,可能需要未來采用新的方法/框架才能夠更好的解決。

第一,反壓的問題

Kafka Connect 設計的邏輯是希望實現(xiàn)源端和目的端完全解耦,這種解偶本身是一個很好的特性。但也帶來一些問題,源和目的地的 task 完全不知道彼此的存在。剛才我提到 Kafka 有容量限制,不能假定在一個客戶環(huán)境里面,會給你無限的磁盤來做緩沖。通常我們在客戶那邊默認 Topic 為 100G 的容量。如果源端讀的過快,大量數(shù)據(jù)會在 Kafka 里堆積,目的端沒有及時消費,就有可能出現(xiàn)數(shù)據(jù)丟失,這是一個非常容易出現(xiàn)的問題。

怎么解決?DataPipeline 作為一個產(chǎn)品,在 Kafka Connect 之上,做了控制層,控制層中有像 Manager 這樣的邏輯組件,會監(jiān)控每一個 Topic 消費的 lag,當達到一定閾值時,會對源端進行限速,保證源和目的地盡可能匹配。

第二,資源隔離

Connect Worker 集群無法對 task 進行資源預留,多個 task 并行運行會相互影響。Worker 的 rest 接口是隊列式的,單個集群任務過多會導致啟停緩慢。

我們正在考慮利用外部的資源調(diào)度框架,例如 K8s 進行 worker 節(jié)點管理;以及通過路由規(guī)則將不同優(yōu)先級任務運行在不同的 worker 集群上,實現(xiàn)預分配和共享資源池的靈活配置。

第三,Rebalance

在 2.3 版本以前,Kafka Connect 的 task rebalance 采用 stop-the-world 模式,牽一發(fā)動全身。在 2.3 版本之后,已經(jīng)做了非常大優(yōu)化,改為了具有粘性的 rebalance。所以如果使用 Kafka Connect,強烈推薦一定要升級到 2.3 以上的版本,也就是目前的最新版本。

五、未來演進路線

基于 MQ 模式的架構(gòu),針對大批量數(shù)據(jù)的同步,實際上還是容易出現(xiàn)性能瓶頸。主要瓶頸是在 MQ 的集群,我們并不能在客戶環(huán)境里無限優(yōu)化 Kafka 集群的性能,因為客戶提供的硬件資源有限。所以一旦客戶給定了硬件資源,Kafka 吞吐的上限就變?yōu)橐粋€固定值。所以針對批量數(shù)據(jù)的同步,可能未來會考慮用內(nèi)存隊列替代 MQ。

同時,會采用更加靈活的 Runtime,主要是為了解決剛才提到的預分配資源池和共享資源池的統(tǒng)一管理問題。

另外,關于數(shù)據(jù)質(zhì)量管理,實際上金融類客戶對數(shù)據(jù)質(zhì)量的一致性要求非常高。所以對于一些對數(shù)據(jù)質(zhì)量要求非常高的客戶,我們考慮提供一些后校驗功能,尤其是針對批量同步。

-----------------------------------

本文作者:陳肅

原文鏈接:https://yq.aliyun.com/articles/719759?utm_content=g_1000079637    

本文為云棲社區(qū)原創(chuàng)內(nèi)容

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多