日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

alpakka-kafka(2)-consumer

 怡紅公子0526 2021-04-29

   alpakka-kafka-consumer的功能描述很簡(jiǎn)單:向kafka訂閱某些topic然后把讀到的消息傳給akka-streams做業(yè)務(wù)處理。在kafka-consumer的實(shí)現(xiàn)細(xì)節(jié)上,為了達(dá)到高可用、高吞吐的目的,topic又可用劃分出多個(gè)分區(qū)partition。分區(qū)是分布在kafka集群節(jié)點(diǎn)broker上的。由于一個(gè)topic可能有多個(gè)partition,對(duì)應(yīng)topic就會(huì)有多個(gè)consumer,形成一個(gè)consumer組,共用統(tǒng)一的groupid。一個(gè)partition只能對(duì)應(yīng)一個(gè)consumer、而一個(gè)consumer負(fù)責(zé)從多個(gè)partition甚至多個(gè)topic讀取消息。kafka會(huì)根據(jù)實(shí)際情況將某個(gè)partition分配給某個(gè)consumer,即partition-assignment。所以一般來說我們會(huì)把topic訂閱與consumer-group掛鉤。這個(gè)可以在典型的ConsumerSettings證實(shí):

  val system = ActorSystem("kafka-sys")
  val config = system.settings.config.getConfig("akka.kafka.consumer")
  val bootstrapServers = "localhost:9092"
  val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

我們先用一個(gè)簡(jiǎn)單的consumer plainSource試試把前一篇示范中producer寫入kafka的消息讀出來: 

import akka.actor.ActorSystem
import akka.kafka._
import akka.kafka.scaladsl._
import akka.stream.{RestartSettings, SystemMaterializer}
import akka.stream.scaladsl.{Keep, RestartSource, Sink}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent._
import scala.concurrent.duration._
object plain_source extends App {
  val system = ActorSystem("kafka-sys")
  val config = system.settings.config.getConfig("akka.kafka.consumer")
  implicit val mat = SystemMaterializer(system).materializer
  implicit val ec: ExecutionContext = mat.executionContext
  val bootstrapServers = "localhost:9092"
  val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val subscription = Subscriptions.topics("greatings")
  Consumer.plainSource(consumerSettings, subscription)
    .runWith(Sink.foreach(msg => println(msg.value())))

  scala.io.StdIn.readLine()
  system.terminate()

}

以上我們沒有對(duì)讀出的消息做任何的業(yè)務(wù)處理,直接顯示出來。注意每次都會(huì)從頭完整讀出,因?yàn)樵O(shè)置了 .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),也就是kafka-consumer的auto.offset.reset = "earliest" 。那么如果需要用讀出的數(shù)據(jù)進(jìn)行業(yè)務(wù)處理的話,每次開始運(yùn)行應(yīng)用時(shí)都會(huì)重復(fù)從頭執(zhí)行這些業(yè)務(wù)。所以需要某種機(jī)制來標(biāo)注已經(jīng)讀取的消息,也就是需要記住當(dāng)前讀取位置offset。

Consumer.plainSource輸入ConsumerRecord類型:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value) {
        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
    }

這個(gè)ConsumerRecord類型里包括了offset,用戶可以自行commit這個(gè)位置參數(shù),也就是說用戶可以選擇把這個(gè)offset存儲(chǔ)在kafka或者其它的數(shù)據(jù)庫里。說到commit-offset,offset管理機(jī)制在kafka-consumer業(yè)務(wù)應(yīng)用中應(yīng)該屬于關(guān)鍵技術(shù)。kafka-consumer方面的業(yè)務(wù)流程可以簡(jiǎn)述為:從kafka讀出業(yè)務(wù)指令,執(zhí)行指令并更新業(yè)務(wù)狀態(tài),然后再從kafka里讀出下一批指令。為了實(shí)現(xiàn)業(yè)務(wù)狀態(tài)的準(zhǔn)確性,無論錯(cuò)過一些指令或者重復(fù)執(zhí)行一些指令都是不能容忍的。所以,必須準(zhǔn)確的標(biāo)記每次從kafka讀取數(shù)據(jù)后的指針位置,commit-offset。但是,如果讀出數(shù)據(jù)后即刻commit-offset,那么在執(zhí)行業(yè)務(wù)指令時(shí)如果系統(tǒng)發(fā)生異常,那么下次再從標(biāo)注的位置開始讀取數(shù)據(jù)時(shí)就會(huì)越過一批業(yè)務(wù)指令。這種情況稱為at-most-once,即可能會(huì)執(zhí)行一次,但絕不會(huì)重復(fù)。另一方面:如果在成功改變業(yè)務(wù)狀態(tài)后再commit-offset,那么,一旦執(zhí)行業(yè)務(wù)指令時(shí)發(fā)生異常而無法進(jìn)行commit-offset,下次讀取的位置將使用前一次的標(biāo)注位置,就會(huì)出現(xiàn)重復(fù)改變業(yè)務(wù)狀態(tài)的情況,這種情況稱為at-least-once,即一定會(huì)執(zhí)行業(yè)務(wù)指令,但可能出現(xiàn)重復(fù)更新情況。如果涉及資金、庫存等業(yè)務(wù),兩者皆不可接受,只能采用exactly-once保證一次這種模式了。不過也有很多業(yè)務(wù)要求沒那么嚴(yán)格,比如某個(gè)網(wǎng)站統(tǒng)計(jì)點(diǎn)擊量,只需個(gè)約莫數(shù),無論at-least-once,at-most-once都可以接受。

kafka-consumer-offset是一個(gè)Long類型的值,可以存放在kafka內(nèi)部或者外部的數(shù)據(jù)庫里。如果選擇在kafka內(nèi)部存儲(chǔ)offset, kafka配置里可以設(shè)定按時(shí)間間隔自動(dòng)進(jìn)行位置標(biāo)注,自動(dòng)把當(dāng)前offset存入kafka里。當(dāng)我們?cè)谏厦胬拥腃onsumerSettings里設(shè)置自動(dòng)commit后,多次重新運(yùn)行就不會(huì)出現(xiàn)重復(fù)數(shù)據(jù)的情況了:

val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(bootstrapServers)
      .withGroupId("group1")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")        //自動(dòng)commit
      .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")   //自動(dòng)commit間隔
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

alpakka-kafka提供了Committer類型,是akka-streams的Sink或Flow組件,負(fù)責(zé)把offset寫入kafka。如果用Committer的Sink或Flow就可以按用戶的需要控制commit-offset的發(fā)生時(shí)間。如下面這段示范代碼: 

 

  val committerSettings = CommitterSettings(system)

  val control: DrainingControl[Done] =
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics("greatings"))
      .mapAsync(10) { msg =>
        BusinessLogic.runBusiness(msg.record.key, msg.record.value)
          .map(_ => msg.committableOffset)
      }
      .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
      .run()

control.drainAndShutdown();

  scala.io.StdIn.readLine()
  system.terminate()

}
object BusinessLogic {
  def runBusiness(key: String, value: String): Future[Done] = Future.successful(Done)
}

上面這個(gè)例子里BusinessLogic.runBusiess()模擬一段業(yè)務(wù)處理代碼,也就是說完成了業(yè)務(wù)處理之后就用Committer.sink進(jìn)行了commit-offset。這是一種at-least-once模式,因?yàn)閞unBusiness可能會(huì)發(fā)生異常失敗,所以有可能出現(xiàn)重復(fù)運(yùn)算的情況。Consumer.committableSource輸出CommittableMessage: 

def committableSource[K, V](settings: ConsumerSettings[K, V],
                              subscription: Subscription): Source[CommittableMessage[K, V], Control] =
    Source.fromGraph(new CommittableSource[K, V](settings, subscription))



  final case class CommittableMessage[K, V](
      record: ConsumerRecord[K, V],
      committableOffset: CommittableOffset
  )

  @DoNotInherit sealed trait CommittableOffset extends Committable {
    def partitionOffset: PartitionOffset
  }

Committer.sink接受輸入Committable類型并將之寫入kafka,上游的CommittableOffset 繼承了 Committable。另外,這個(gè)DrainingControl類型結(jié)合了Control類型和akka-streams終結(jié)信號(hào)可以有效控制整個(gè)consumer-streams安全終結(jié)。

alpakka-kafka還有一個(gè)atMostOnceSource。這個(gè)Source組件每讀一條數(shù)據(jù)就會(huì)立即自動(dòng)commit-offset:

  def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
                             subscription: Subscription): Source[ConsumerRecord[K, V], Control] =
    committableSource[K, V](settings, subscription).mapAsync(1) { m =>
      m.committableOffset.commitInternal().map(_ => m.record)(ExecutionContexts.sameThreadExecutionContext)
    }

可以看出來,atMostOnceSource在輸出ConsumerRecord之前就進(jìn)行了commit-offset。atMostOnceSource的一個(gè)具體使用示范如下:

  import scala.collection.immutable
  val control: DrainingControl[immutable.Seq[Done]] =
    Consumer
      .atMostOnceSource(consumerSettings, Subscriptions.topics("greatings"))
      .mapAsync(1)(record => BussinessLogic.runBusiness(record.key, record.value()))
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()

  control.drainAndShutdown();
  scala.io.StdIn.readLine()
  system.terminate()

所以,使用atMostOnceSource后是不需要任何Committer來進(jìn)行commit-offset的了。值得注意的是atMostOnceSource是對(duì)每一條數(shù)據(jù)進(jìn)行位置標(biāo)注的,所以運(yùn)行效率必然會(huì)受到影響,如果要求不是那么嚴(yán)格的話還是啟動(dòng)自動(dòng)commit比較合適。

對(duì)于任何類型的交易業(yè)務(wù)系統(tǒng)來說,無論at-least-once或at-most-once都是不可接受的,只有exactly-once才妥當(dāng)。實(shí)現(xiàn)exactly-once的其中一個(gè)方法是把offset與業(yè)務(wù)數(shù)據(jù)存放在同一個(gè)外部數(shù)據(jù)庫中。如果在外部數(shù)據(jù)庫通過事務(wù)處理機(jī)制(transaction-processing)把業(yè)務(wù)狀態(tài)更新與commit-offset放在一個(gè)事務(wù)單元中同進(jìn)同退就能實(shí)現(xiàn)exactly-once模式了。下面這段是官方文檔給出的一個(gè)示范:

  val db = new mongoldb
  val control = db.loadOffset().map { fromOffset =>
    Consumer
      .plainSource(
        consumerSettings,
        Subscriptions.assignmentWithOffset(
          new TopicPartition(topic, /* partition = */ 0) -> fromOffset
        )
      )
      .mapAsync(1)(db.businessLogicAndStoreOffset)
      .toMat(Sink.seq)(DrainingControl.apply)
      .run()
  }

class mongoldb {
  def businessLogicAndStoreOffset(record: ConsumerRecord[String, String]): Future[Done] = // ...
  def loadOffset(): Future[Long] = // ...
}

在上面這段代碼里:db.loadOffset()從mongodb里取出上一次讀取位置,返回Future[Long],然后用Subscriptions.assignmentWithOffset把這個(gè)offset放在一個(gè)tuple (TopicPartition,Long)里。TopicPartition定義如下: 

    public TopicPartition(String topic, int partition) {
        this.partition = partition;
        this.topic = topic;
    }

這樣Consumer.plainSource就可以從offset開始讀取數(shù)據(jù)了。plainSource輸出ConsumerRecord類型:

    public ConsumerRecord(String topic,
                          int partition,
                          long offset,
                          K key,
                          V value) {
        this(topic, partition, offset, NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE,
                NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value);
    }

這里面除業(yè)務(wù)指令value外還提供了當(dāng)前offset。這些已經(jīng)足夠在businessLogicAndStoreOffset()里運(yùn)算一個(gè)單獨(dú)的business+offset事務(wù)了(transaction)。 

 

 

 

 

 

 

 

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多