日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

asp .net core發(fā)布訂閱kafka

 怡紅公子0526 2021-04-03

Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),有如下特性:

  • 通過(guò)O的磁盤(pán)數(shù)據(jù)結(jié)構(gòu)提供消息的持久化,這種結(jié)構(gòu)對(duì)于即使數(shù)以TB的消息存儲(chǔ)也能夠保持長(zhǎng)時(shí)間的穩(wěn)定性能。

  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數(shù)百萬(wàn) [2] 的消息。

  • 支持通過(guò)Kafka服務(wù)器和消費(fèi)機(jī)集群來(lái)分區(qū)消息。

  • 支持Hadoop并行數(shù)據(jù)加載。
    Kafka通過(guò)官網(wǎng)發(fā)布了最新版本2.3.0

相關(guān)術(shù)語(yǔ)介紹

  • Broker
    Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker

  • Topic
    每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)

  • Partition
    Partition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.

  • Producer
    負(fù)責(zé)發(fā)布消息到Kafka broker

  • Consumer
    消息消費(fèi)者,向Kafka broker讀取消息的客戶端。

  • Consumer Group
    每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)。

在這里我們用了一個(gè)第三方庫(kù)叫Confluent.kafka,在nuget上搜索一下就出來(lái)了,感謝原作者。

新建一個(gè) .net core類庫(kù)項(xiàng)目

安裝第三方依賴庫(kù),如下圖所示:
41.png

新建一個(gè)SUPKafkaTopicConsumer類

這是用來(lái)創(chuàng)建并初始化消費(fèi)者,接下來(lái)看看這個(gè)類里面包含了什么。

  • 首先聲明一個(gè)委托,用來(lái)接收訂閱消息

public delegate void OnReceivedHandle(object data);

初始化消費(fèi)者,構(gòu)造函數(shù)中傳入kafka地址,以及要訂閱的組groupId,另外注入了log4net記錄日志信息。
init()方法用來(lái)初始化,新建一個(gè)消費(fèi)者,具體代碼如下。

 public class SUPKafkaTopicConsumer<TKey, TValue>
    {
        private IConsumer<TKey, TValue> consumer;
        private SUPLogger logger_;
        private string BootStrapServer;
        private string GroupId;
      
        public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
        {
            BootStrapServer = bootStrapServer;
            GroupId = groupId;
            logger_ = logger;
        }

        public bool Init()
        {
            try
            {
                var conf = new ConsumerConfig
                {
                    GroupId = GroupId,
                    BootstrapServers = BootStrapServer,
                    AutoOffsetReset = AutoOffsetReset.Earliest,
                    EnableAutoCommit = false // 設(shè)置非自動(dòng)偏移,業(yè)務(wù)邏輯完成后手動(dòng)處理偏移,防止數(shù)據(jù)丟失
                };
                consumer = new ConsumerBuilder<TKey, TValue>(conf)
                    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                    .Build();

                return true;
            }
            catch (Exception ex)
            {
                throw;
            }
        }
  • 定義回調(diào)事件,用以處理用戶自定義方法。

public event OnReceivedHandle onReceivedHandle;
  • 定義一個(gè)訂閱的方法,傳入topic,以及是否需要提交偏移量。
    其實(shí)看init()方法中我把EnableAutoCommit=false,取消了自動(dòng)提交,讓?xiě)?yīng)用程序決定何時(shí)提交 偏移量,為什么這么做呢?
    自動(dòng)提交雖然方便,但是也有一些弊端,自動(dòng)提交的弊端是通過(guò)間隔時(shí)間。 一般是默認(rèn)5s提交時(shí)間間隔,在最近一次提交之后的 3s發(fā)生了再均衡,再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開(kāi)始讀取消息。這個(gè)時(shí)候偏移量已經(jīng)落后 了 3s,所以在這 3s 內(nèi)到達(dá)的消息會(huì)被重復(fù)處理??梢酝ㄟ^(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時(shí)間窗,不過(guò)這種情況是無(wú)也完全避免的 。
    大部分開(kāi)發(fā)者通過(guò)控制偏移量提交時(shí)間來(lái)消除丟失消息的可能性,井在發(fā)生再均衡時(shí)減少 重復(fù)消息的數(shù)量。消費(fèi)者 API提供了另一種提交偏移量的方式 , 開(kāi)發(fā)者可以在必要的時(shí)候 提交當(dāng)前偏移盤(pán),而不是基于時(shí)間間隔。

public void Subscribe(string topic, bool isCommit)
        {
            try
            {
                if (consumer != null)
                {
                    consumer.Subscribe(topic);
                    while (true)
                    {
                        var consume = consumer.Consume();
                        if (onReceivedHandle != null)
                        {
                            onReceivedHandle(consume);

                            if (isCommit)
                            {
                                consumer.Commit(consume);
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                //consumer.Close();
                throw ex;
            }
        }
  • 取消訂閱

 public void UnSubscribe()
        {
            if (consumer != null)
            {
                consumer.Unsubscribe();
            }
        }

新建生產(chǎn)者類

  • 首先定義了ISUPKafkaProducer<Tkey, TValue>接口,包含四個(gè)方法

 public interface ISUPKafkaProducer<Tkey,TValue>
    {
        ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
        ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);

        ISendResult AsyncSend(Tkey key, TValue value,string topic);
        ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
    }
  • 接口的實(shí)現(xiàn),初始化過(guò)程類似消費(fèi)者

internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
    {
        private IProducer<Tkey, TValue> producer;
        private SUPLogger logger_;
        private string m_bootStrapServer;

        public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
        {
            m_bootStrapServer = bootStrapServer;
            logger_ = logger;
        }
        public bool Init()
        {
            try
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = m_bootStrapServer
                };
                producer = new ProducerBuilder<Tkey, TValue>(config)
                    .SetErrorHandler((producer, error) =>
                    {
                        logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
                            m_bootStrapServer, error.Code, error.Reason));
                    })
                    .SetLogHandler((producer, msg) =>
                    {
                        logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
                            m_bootStrapServer, msg.Name, msg.Message));
                    })
                    .Build();

                return true;
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

實(shí)現(xiàn)繼承至ISUPKafkaProducer<Tkey, TValue>的方法

 public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    producer.Produce(topic, message, sendCallBack);
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "沒(méi)有初始化生產(chǎn)者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    producer.Produce(topicPartition, message, sendCallBack);
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "沒(méi)有初始化生產(chǎn)者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult AsyncSend(Tkey key, TValue value,string topic)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };
                    var deliveryReport = producer.ProduceAsync(topic, message);
                    deliveryReport.ContinueWith(task =>
                   {
                       Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                   });
                    producer.Flush(TimeSpan.FromSeconds(10));
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "沒(méi)有初始化生產(chǎn)者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

        public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
        {
            try
            {
                if (producer != null)
                {
                    var message = new Message<Tkey, TValue>
                    {
                        Value = value,
                        Key = key
                    };

                    var deliveryReport = producer.ProduceAsync(topicPartition, message);
                    deliveryReport.ContinueWith(task =>
                    {
                        Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
                    });

                    producer.Flush(TimeSpan.FromSeconds(10));
                    return new SendResult(true);
                }
                else
                {
                    return new SendResult(true, "沒(méi)有初始化生產(chǎn)者");
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

新建一個(gè)SUPKafkaMessageCenter類

這個(gè)類是對(duì)外開(kāi)放的,我們利用這個(gè)類來(lái)管理生產(chǎn)者和消費(fèi)者,看下代碼非常簡(jiǎn)單。

public static class SUPKafkaMessageCenter<Tkey, TValue>
    {
        private static SUPLogger logger = null;
        static SUPKafkaMessageCenter()
        {
            SUPLoggerManager.Configure();
            logger = new SUPLogger("KafkaCenter");
        }
        /// <summary>
        /// 創(chuàng)建生產(chǎn)者
        /// </summary>
        /// <param name="bootstrapServer"></param>
        /// <param name="topicName"></param>
        /// <returns></returns>
        public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
        {
            if (string.IsNullOrEmpty(bootstrapServer))
            {
                return null;
            }
            var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
            if (!producer.Init())
            {
                return null;
            }
            return producer;
        }

        /// <summary>
        /// 創(chuàng)建消費(fèi)者
        /// </summary>
        /// <param name="bootstrapServer"></param>
        /// <param name="groupId"></param>
        /// <returns></returns>
        public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
        {
            if (string.IsNullOrEmpty(bootstrapServer))
            {
                return null;
            }
            var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
            if (!consumer.Init())
            {
                return null;
            }
            return consumer;
        }

測(cè)試

新建一個(gè)測(cè)試的控制臺(tái)程序,調(diào)用代碼如下

  • 消費(fèi)者

var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
            //綁定接收信息,回調(diào)函數(shù)
            consumer.onReceivedHandle += CallBack;

            var topics = new List<string>();
            topics.Add("kafka-default-topic");
            topics.Add("test");
            //訂閱主題
            consumer.Subscribe(topics, false);
  • 生產(chǎn)者

ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});

除了上面寫(xiě)的這些方法,其實(shí)對(duì)于kafka還有很多功能,比如topic的增刪改查,我把它認(rèn)為是管理類的,這里就不貼代碼了。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多