alpakka-kafka-consumer的功能描述很簡(jiǎn)單:向kafka訂閱某些topic然后把讀到的消息傳給akka-streams做業(yè)務(wù)處理。在kafka-consumer的實(shí)現(xiàn)細(xì)節(jié)上,為了達(dá)到高可用、高吞吐的目的,topic又可用劃分出多個(gè)分區(qū)partition。分區(qū)是分布在kafka集群節(jié)點(diǎn)broker上的。由于一個(gè)topic可能有多個(gè)partition,對(duì)應(yīng)topic就會(huì)有多個(gè)consumer,形成一個(gè)consumer組,共用統(tǒng)一的groupid。一個(gè)partition只能對(duì)應(yīng)一個(gè)consumer、而一個(gè)consumer負(fù)責(zé)從多個(gè)partition甚至多個(gè)topic讀取消息。kafka會(huì)根據(jù)實(shí)際情況將某個(gè)partition分配給某個(gè)consumer,即partition-assignment。所以一般來說我們會(huì)把topic訂閱與consumer-group掛鉤。這個(gè)可以在典型的ConsumerSettings證實(shí): val system = ActorSystem("kafka-sys") val config = system.settings.config.getConfig("akka.kafka.consumer") val bootstrapServers = "localhost:9092" val consumerSettings = ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 我們先用一個(gè)簡(jiǎn)單的consumer plainSource試試把前一篇示范中producer寫入kafka的消息讀出來: import akka.actor.ActorSystem import akka.kafka._ import akka.kafka.scaladsl._ import akka.stream.{RestartSettings, SystemMaterializer} import akka.stream.scaladsl.{Keep, RestartSource, Sink} import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} import scala.concurrent._ import scala.concurrent.duration._ object plain_source extends App { val system = ActorSystem("kafka-sys") val config = system.settings.config.getConfig("akka.kafka.consumer") implicit val mat = SystemMaterializer(system).materializer implicit val ec: ExecutionContext = mat.executionContext val bootstrapServers = "localhost:9092" val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val subscription = Subscriptions.topics("greatings") Consumer.plainSource(consumerSettings, subscription) .runWith(Sink.foreach(msg => println(msg.value()))) scala.io.StdIn.readLine() system.terminate() } 以上我們沒有對(duì)讀出的消息做任何的業(yè)務(wù)處理,直接顯示出來。注意每次都會(huì)從頭完整讀出,因?yàn)樵O(shè)置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用讀出的數(shù)據(jù)進(jìn)行業(yè)務(wù)處理的話,每次開始運(yùn)行應(yīng)用時(shí)都會(huì)重復(fù)從頭執(zhí)行這些業(yè)務(wù)。所以需要某種機(jī)制來標(biāo)注已經(jīng)讀取的消息,也就是需要記住當(dāng)前讀取位置offset。 Consumer.plainSource輸入ConsumerRecord類型: public ConsumerRecord(String topic, int partition, long offset, K key, V value) { this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value); } 這個(gè)ConsumerRecord類型里包括了offset,用戶可以自行commit這個(gè)位置參數(shù),也就是說用戶可以選擇把這個(gè)offset存儲(chǔ)在kafka或者其它的數(shù)據(jù)庫里。說到commit-offset,offset管理機(jī)制在kafka-consumer業(yè)務(wù)應(yīng)用中應(yīng)該屬于關(guān)鍵技術(shù)。kafka-consumer方面的業(yè)務(wù)流程可以簡(jiǎn)述為:從kafka讀出業(yè)務(wù)指令,執(zhí)行指令并更新業(yè)務(wù)狀態(tài),然后再從kafka里讀出下一批指令。為了實(shí)現(xiàn)業(yè)務(wù)狀態(tài)的準(zhǔn)確性,無論錯(cuò)過一些指令或者重復(fù)執(zhí)行一些指令都是不能容忍的。所以,必須準(zhǔn)確的標(biāo)記每次從kafka讀取數(shù)據(jù)后的指針位置,commit-offset。但是,如果讀出數(shù)據(jù)后即刻commit-offset,那么在執(zhí)行業(yè)務(wù)指令時(shí)如果系統(tǒng)發(fā)生異常,那么下次再從標(biāo)注的位置開始讀取數(shù)據(jù)時(shí)就會(huì)越過一批業(yè)務(wù)指令。這種情況稱為at-most-once,即可能會(huì)執(zhí)行一次,但絕不會(huì)重復(fù)。另一方面:如果在成功改變業(yè)務(wù)狀態(tài)后再commit-offset,那么,一旦執(zhí)行業(yè)務(wù)指令時(shí)發(fā)生異常而無法進(jìn)行commit-offset,下次讀取的位置將使用前一次的標(biāo)注位置,就會(huì)出現(xiàn)重復(fù)改變業(yè)務(wù)狀態(tài)的情況,這種情況稱為at-least-once,即一定會(huì)執(zhí)行業(yè)務(wù)指令,但可能出現(xiàn)重復(fù)更新情況。如果涉及資金、庫存等業(yè)務(wù),兩者皆不可接受,只能采用exactly-once保證一次這種模式了。不過也有很多業(yè)務(wù)要求沒那么嚴(yán)格,比如某個(gè)網(wǎng)站統(tǒng)計(jì)點(diǎn)擊量,只需個(gè)約莫數(shù),無論at-least-once,at-most-once都可以接受。 kafka-consumer-offset是一個(gè)Long類型的值,可以存放在kafka內(nèi)部或者外部的數(shù)據(jù)庫里。如果選擇在kafka內(nèi)部存儲(chǔ)offset, kafka配置里可以設(shè)定按時(shí)間間隔自動(dòng)進(jìn)行位置標(biāo)注,自動(dòng)把當(dāng)前offset存入kafka里。當(dāng)我們?cè)谏厦胬拥腃onsumerSettings里設(shè)置自動(dòng)commit后,多次重新運(yùn)行就不會(huì)出現(xiàn)重復(fù)數(shù)據(jù)的情況了: val consumerSettings = ConsumerSettings(config, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId("group1") .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") //自動(dòng)commit .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") //自動(dòng)commit間隔 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") alpakka-kafka提供了Committer類型,是akka-streams的Sink或Flow組件,負(fù)責(zé)把offset寫入kafka。如果用Committer的Sink或Flow就可以按用戶的需要控制commit-offset的發(fā)生時(shí)間。如下面這段示范代碼:
val committerSettings = CommitterSettings(system) val control: DrainingControl[Done] = Consumer .committableSource(consumerSettings, Subscriptions.topics("greatings")) .mapAsync(10) { msg => BusinessLogic.runBusiness(msg.record.key, msg.record.value) .map(_ => msg.committableOffset) } .toMat(Committer.sink(committerSettings))(DrainingControl.apply) .run() control.drainAndShutdown(); scala.io.StdIn.readLine() system.terminate() } object BusinessLogic { def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done) } 上面這個(gè)例子里BusinessLogic.runBusiess()模擬一段業(yè)務(wù)處理代碼,也就是說完成了業(yè)務(wù)處理之后就用Committer.sink進(jìn)行了commit-offset。這是一種at-least-once模式,因?yàn)閞unBusiness可能會(huì)發(fā)生異常失敗,所以有可能出現(xiàn)重復(fù)運(yùn)算的情況。Consumer.committableSource輸出CommittableMessage: def committableSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[CommittableMessage[K, V], Control] = Source.fromGraph(new CommittableSource[K, V](settings, subscription)) final case class CommittableMessage[K, V]( record: ConsumerRecord[K, V], committableOffset: CommittableOffset ) @DoNotInherit sealed trait CommittableOffset extends Committable { def partitionOffset: PartitionOffset } Committer.sink接受輸入Committable類型并將之寫入kafka,上游的CommittableOffset 繼承了 Committable。另外,這個(gè)DrainingControl類型結(jié)合了Control類型和akka-streams終結(jié)信號(hào)可以有效控制整個(gè)consumer-streams安全終結(jié)。 alpakka-kafka還有一個(gè)atMostOnceSource。這個(gè)Source組件每讀一條數(shù)據(jù)就會(huì)立即自動(dòng)commit-offset: def atMostOnceSource[K, V](settings: ConsumerSettings[K, V], subscription: Subscription): Source[ConsumerRecord[K, V], Control] = committableSource[K, V](settings, subscription).mapAsync(1) { m => m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext) } 可以看出來,atMostOnceSource在輸出ConsumerRecord之前就進(jìn)行了commit-offset。atMostOnceSource的一個(gè)具體使用示范如下: import scala.collection.immutable val control: DrainingControl[immutable.Seq[Done]] = Consumer .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings")) .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value())) .toMat(Sink.seq)(DrainingControl.apply) .run() control.drainAndShutdown(); scala.io.StdIn.readLine() system.terminate() 所以,使用atMostOnceSource后是不需要任何Committer來進(jìn)行commit-offset的了。值得注意的是atMostOnceSource是對(duì)每一條數(shù)據(jù)進(jìn)行位置標(biāo)注的,所以運(yùn)行效率必然會(huì)受到影響,如果要求不是那么嚴(yán)格的話還是啟動(dòng)自動(dòng)commit比較合適。 對(duì)于任何類型的交易業(yè)務(wù)系統(tǒng)來說,無論at-least-once或at-most-once都是不可接受的,只有exactly-once才妥當(dāng)。實(shí)現(xiàn)exactly-once的其中一個(gè)方法是把offset與業(yè)務(wù)數(shù)據(jù)存放在同一個(gè)外部數(shù)據(jù)庫中。如果在外部數(shù)據(jù)庫通過事務(wù)處理機(jī)制(transaction-processing)把業(yè)務(wù)狀態(tài)更新與commit-offset放在一個(gè)事務(wù)單元中同進(jìn)同退就能實(shí)現(xiàn)exactly-once模式了。下面這段是官方文檔給出的一個(gè)示范: val db = new mongoldb val control = db.loadOffset().map { fromOffset => Consumer .plainSource( consumerSettings, Subscriptions.assignmentWithOffset( new TopicPartition(topic, /* partition = */ 0) -> fromOffset ) ) .mapAsync(1)(db.businessLogicAndStoreOffset) .toMat(Sink.seq)(DrainingControl.apply) .run() } class mongoldb { def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ... def loadOffset(): Future[Long] = // ... } 在上面這段代碼里:db.loadOffset()從mongodb里取出上一次讀取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把這個(gè)offset放在一個(gè)tuple (TopicPartition,Long)里。TopicPartition定義如下: public TopicPartition(String topic, int partition) { this.partition = partition; this.topic = topic; } 這樣Consumer.plainSource就可以從offset開始讀取數(shù)據(jù)了。plainSource輸出ConsumerRecord類型: public ConsumerRecord(String topic, int partition, long offset, K key, V value) { this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value); } 這里面除業(yè)務(wù)指令value外還提供了當(dāng)前offset。這些已經(jīng)足夠在businessLogicAndStoreOffset()里運(yùn)算一個(gè)單獨(dú)的business+offset事務(wù)了(transaction)。
|
|