Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),有如下特性:
相關(guān)術(shù)語(yǔ)介紹
在這里我們用了一個(gè)第三方庫(kù)叫Confluent.kafka,在nuget上搜索一下就出來(lái)了,感謝原作者。 新建一個(gè) .net core類庫(kù)項(xiàng)目安裝第三方依賴庫(kù),如下圖所示: 新建一個(gè)SUPKafkaTopicConsumer類這是用來(lái)創(chuàng)建并初始化消費(fèi)者,接下來(lái)看看這個(gè)類里面包含了什么。
public delegate void OnReceivedHandle(object data); 初始化消費(fèi)者,構(gòu)造函數(shù)中傳入kafka地址,以及要訂閱的組groupId,另外注入了log4net記錄日志信息。 public class SUPKafkaTopicConsumer<TKey, TValue> { private IConsumer<TKey, TValue> consumer; private SUPLogger logger_; private string BootStrapServer; private string GroupId; public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null) { BootStrapServer = bootStrapServer; GroupId = groupId; logger_ = logger; } public bool Init() { try { var conf = new ConsumerConfig { GroupId = GroupId, BootstrapServers = BootStrapServer, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false // 設(shè)置非自動(dòng)偏移,業(yè)務(wù)邏輯完成后手動(dòng)處理偏移,防止數(shù)據(jù)丟失 }; consumer = new ConsumerBuilder<TKey, TValue>(conf) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build(); return true; } catch (Exception ex) { throw; } }
public event OnReceivedHandle onReceivedHandle;
public void Subscribe(string topic, bool isCommit) { try { if (consumer != null) { consumer.Subscribe(topic); while (true) { var consume = consumer.Consume(); if (onReceivedHandle != null) { onReceivedHandle(consume); if (isCommit) { consumer.Commit(consume); } } } } } catch (Exception ex) { //consumer.Close(); throw ex; } }
public void UnSubscribe() { if (consumer != null) { consumer.Unsubscribe(); } } 新建生產(chǎn)者類
public interface ISUPKafkaProducer<Tkey,TValue> { ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null); ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null); ISendResult AsyncSend(Tkey key, TValue value,string topic); ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition); }
internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue> { private IProducer<Tkey, TValue> producer; private SUPLogger logger_; private string m_bootStrapServer; public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null) { m_bootStrapServer = bootStrapServer; logger_ = logger; } public bool Init() { try { var config = new ProducerConfig { BootstrapServers = m_bootStrapServer }; producer = new ProducerBuilder<Tkey, TValue>(config) .SetErrorHandler((producer, error) => { logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}", m_bootStrapServer, error.Code, error.Reason)); }) .SetLogHandler((producer, msg) => { logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}", m_bootStrapServer, msg.Name, msg.Message)); }) .Build(); return true; } catch (Exception ex) { throw ex; } } 實(shí)現(xiàn)繼承至ISUPKafkaProducer<Tkey, TValue>的方法 public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; producer.Produce(topic, message, sendCallBack); return new SendResult(true); } else { return new SendResult(true, "沒(méi)有初始化生產(chǎn)者"); } } catch (Exception ex) { throw ex; } } public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; producer.Produce(topicPartition, message, sendCallBack); return new SendResult(true); } else { return new SendResult(true, "沒(méi)有初始化生產(chǎn)者"); } } catch (Exception ex) { throw ex; } } public ISendResult AsyncSend(Tkey key, TValue value,string topic) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; var deliveryReport = producer.ProduceAsync(topic, message); deliveryReport.ContinueWith(task => { Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset); }); producer.Flush(TimeSpan.FromSeconds(10)); return new SendResult(true); } else { return new SendResult(true, "沒(méi)有初始化生產(chǎn)者"); } } catch (Exception ex) { throw ex; } } public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; var deliveryReport = producer.ProduceAsync(topicPartition, message); deliveryReport.ContinueWith(task => { Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset); }); producer.Flush(TimeSpan.FromSeconds(10)); return new SendResult(true); } else { return new SendResult(true, "沒(méi)有初始化生產(chǎn)者"); } } catch (Exception ex) { throw ex; } } 新建一個(gè)SUPKafkaMessageCenter類這個(gè)類是對(duì)外開(kāi)放的,我們利用這個(gè)類來(lái)管理生產(chǎn)者和消費(fèi)者,看下代碼非常簡(jiǎn)單。 public static class SUPKafkaMessageCenter<Tkey, TValue> { private static SUPLogger logger = null; static SUPKafkaMessageCenter() { SUPLoggerManager.Configure(); logger = new SUPLogger("KafkaCenter"); } /// <summary> /// 創(chuàng)建生產(chǎn)者 /// </summary> /// <param name="bootstrapServer"></param> /// <param name="topicName"></param> /// <returns></returns> public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer) { if (string.IsNullOrEmpty(bootstrapServer)) { return null; } var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger); if (!producer.Init()) { return null; } return producer; } /// <summary> /// 創(chuàng)建消費(fèi)者 /// </summary> /// <param name="bootstrapServer"></param> /// <param name="groupId"></param> /// <returns></returns> public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group") { if (string.IsNullOrEmpty(bootstrapServer)) { return null; } var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger); if (!consumer.Init()) { return null; } return consumer; } 測(cè)試新建一個(gè)測(cè)試的控制臺(tái)程序,調(diào)用代碼如下
var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092"); //綁定接收信息,回調(diào)函數(shù) consumer.onReceivedHandle += CallBack; var topics = new List<string>(); topics.Add("kafka-default-topic"); topics.Add("test"); //訂閱主題 consumer.Subscribe(topics, false);
ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092"); kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...}); 除了上面寫(xiě)的這些方法,其實(shí)對(duì)于kafka還有很多功能,比如topic的增刪改查,我把它認(rèn)為是管理類的,這里就不貼代碼了。 |
|