日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Dapr微服務(wù)應(yīng)用開發(fā)系列5:發(fā)布訂閱構(gòu)建塊

 風(fēng)聲之家 2021-04-05

dotNET跨平臺(tái) 今天

以下文章來(lái)源于dotNET開發(fā)經(jīng)驗(yàn)談 ,作者朱永光

、團(tuán)隊(duì)管理者有用的一些精華文章。

題記:這篇介紹發(fā)布訂閱構(gòu)建塊,這是對(duì)事件驅(qū)動(dòng)架構(gòu)設(shè)計(jì)的一種實(shí)現(xiàn)落地。

注:對(duì)于“Building Blocks”這個(gè)詞組的翻譯,我之前使用了“構(gòu)件塊”,現(xiàn)在和官方文檔(Dapr中文社區(qū)的貢獻(xiàn))保持一致,采用“構(gòu)建塊”。

原理

發(fā)布訂閱的概念來(lái)自于事件驅(qū)動(dòng)架構(gòu)(EDA)的設(shè)計(jì)思想,這是一種讓程序(應(yīng)用、服務(wù))之間解耦的主要方式,通過(guò)發(fā)布訂閱的思想也可以實(shí)現(xiàn)服務(wù)之間的異步調(diào)用。而大部分分布式應(yīng)用都會(huì)依賴這樣的發(fā)布訂閱解耦模式。

整個(gè)發(fā)布訂閱的思想其實(shí)是比較簡(jiǎn)單的:

圖片

如上圖所示,把需要解耦的程序分別設(shè)定為事件發(fā)布者或者事件訂閱者(理論上,對(duì)于某個(gè)事件,一個(gè)程序僅能作為一種角色;對(duì)于不同事件,一個(gè)程序可以既作為發(fā)布者又可以作為訂閱者)。同時(shí)利用消息代理(Message Broker)中間件把兩者對(duì)接起來(lái),消息代理即作為事件消息的傳輸通道。

在Dapr中對(duì)這種發(fā)布訂閱模式進(jìn)行了高度抽象的實(shí)現(xiàn),并提供了自由替換消息代理中間件的特性,如下圖所示:

Dapr的發(fā)布訂閱構(gòu)建塊也可以被看作一種事件總線(Event Bus)的實(shí)現(xiàn),只是你不需要使用特殊的協(xié)議,在發(fā)布端和訂閱端僅使用HTTP/gRPC即可。

在事件總線中,把發(fā)布訂閱兩者關(guān)聯(lián)在一起的是事件類型,那么在Dapr中也引入了一個(gè)類似的概念——主題(Topic)。如果對(duì)消息隊(duì)列中間件熟悉的人對(duì)于這個(gè)概念不會(huì)陌生。由此發(fā)布端和訂閱端的處理過(guò)程和針對(duì)Dapr的接口也就是圍繞主題來(lái)展開的。

能力

消息發(fā)送

既然Dapr的PubSub是一種事件總線,那么要發(fā)送消息,必然需要對(duì)代表主題(事件類型)的消息進(jìn)行封裝。Dapr并沒(méi)有去創(chuàng)造一種獨(dú)有的格式,而是采用了目前業(yè)界比較流行的開放協(xié)議——云事件(CloudEvents)規(guī)范。這種格式把事件消息封裝為如下JSON數(shù)據(jù):

{
"specversion" : "1.0",
"type" : "xml.message",
"source" : "https:///message",
"subject" : "Test XML Message",
"id" : "id-1234-5678-9101",
"time" : "2020-09-23T06:23:21Z",
"datacontenttype" : "text/xml",
"data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}

當(dāng)然對(duì)消息的封裝不需要應(yīng)用程序本身去關(guān)心,你只需要給Dapr傳遞data的字符串即可,而這個(gè)字符串本身是以什么格式(不管xml還是json)去承載內(nèi)容都是由應(yīng)用程序確定。具體如何發(fā)送消息,下面規(guī)范部分會(huì)介紹。

消息傳遞

Dapr會(huì)自動(dòng)根據(jù)主題把消息發(fā)送給所有訂閱者,傳遞過(guò)程保證“至少一次”送達(dá)。送達(dá)的判斷標(biāo)準(zhǔn)是基于訂閱者的響應(yīng)是否成功(即HTTP狀態(tài)碼為20X)。

當(dāng)然,訂閱者也可以在響應(yīng)體中設(shè)置 status 屬性來(lái)給出更為精細(xì)的處理指令,比如 RETRY 告知Dapr之前處理失敗了,現(xiàn)在是重試成功了;或者 DROP 告知Dapr應(yīng)用程序?qū)@個(gè)消息處理出現(xiàn)問(wèn)題,已經(jīng)記錄了告警日志,但是不打算繼續(xù)處理它了。

消息傳遞還有一個(gè)重要的特性需要理解,就是消息的生存期(Time-to-Live,TTL)。TTL規(guī)定了消息在Dapr(實(shí)際上是在消息代理中間件)里面的存活時(shí)間,如果TTL過(guò)期,那么消息就不會(huì)再被傳遞(即變成死信)。所有目前支持的發(fā)布訂閱組件都支持TTL的特性,Dapr會(huì)幫助你處理這方面的邏輯。

消息消費(fèi)

為了消費(fèi)消息,需要對(duì)主題進(jìn)行注冊(cè),可以通過(guò)聲明式和編程式來(lái)進(jìn)行注冊(cè)。聲明式通過(guò)外部的yaml文件定義一個(gè)K8S的CRD,來(lái)描述服務(wù)需要訂閱什么主題,接收事件的HTTP API路由地址。編程式通過(guò)暴露特定的HTTP API路由地址或者特定的gRPC方法來(lái)讓Dapr運(yùn)行時(shí)進(jìn)行訪問(wèn),從而注冊(cè)需要訂閱什么主題和接收事件的地址。

發(fā)布訂閱構(gòu)建塊采用的是所謂競(jìng)爭(zhēng)者消費(fèi)模式,即同一個(gè)應(yīng)用(AppId相同)的多個(gè)實(shí)例,只會(huì)有一個(gè)實(shí)例獲得消息,這些同個(gè)應(yīng)用的多個(gè)實(shí)例稱之為一個(gè)消費(fèi)組。如果你希望消息被多個(gè)應(yīng)用得到,那么就需要使用多個(gè)消費(fèi)組,也即多個(gè)AppId。

主題范圍限制

從上面所知,在發(fā)送消息和消費(fèi)消息的時(shí)候,都需要針對(duì)某個(gè)主題。為了對(duì)消息的傳遞進(jìn)行更加精細(xì)的控制,在發(fā)布訂閱構(gòu)建塊中可以對(duì)主題范圍進(jìn)行限制,即某些主題只能由某些應(yīng)用來(lái)發(fā)布,某些主題只能由某些應(yīng)用來(lái)訂閱。

要進(jìn)行范圍限制,需要對(duì)發(fā)布訂閱組件的配置yaml進(jìn)行配置,設(shè)置 spec.metadata 下面的 publishingScopes, subscriptionScopes 和 allowedTopics 配置。(詳細(xì)說(shuō)明見(jiàn)未來(lái)的關(guān)于組件的文章)。

規(guī)范

Dapr給PubSub這一構(gòu)建塊提供了兩方面的規(guī)范:消息生產(chǎn)端和消息消費(fèi)端。

消息生產(chǎn)端

通過(guò)POST如下地址,來(lái)發(fā)送消息到特定主題:

POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]

其中pubsubname代表了PubSub組件的名稱;topic代表了目標(biāo)主題名稱。

在 Content-Type 請(qǐng)求頭中設(shè)置請(qǐng)求體的格式,比如 application/json

請(qǐng)求體根據(jù)請(qǐng)求頭里面的設(shè)置格式,傳入JSON或者XML,Dapr會(huì)自動(dòng)把請(qǐng)求體封裝為CloudEvent格式。

如果是直接調(diào)用gRPC的接口的話,是調(diào)用 PublishEvent 接口并傳遞 PublishEventRequest實(shí)體。

消息消費(fèi)端

如果你的消費(fèi)端是通過(guò)聲明式來(lái)向Dapr注冊(cè)需要訂閱什么主題的消息,那么在如下格式的yaml文件中進(jìn)行定義:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
route: /dsstatus
pubsubname: pubsub
scopes:
- app1
- app2

其中 spec.topic 代表了要訂閱的主題名稱,spec.route 代表了接收訂閱消息的HTTP路由地址,spec.route 代表了針對(duì)的PubSub組件是那個(gè)。尤為關(guān)鍵是 scopes 里面設(shè)置了這樣的訂閱聲明是針對(duì)那個(gè)(或那幾個(gè))應(yīng)用起作用(填入appid)。

如果你的消費(fèi)端是通過(guò)編程式來(lái)向Dapr注冊(cè)需要訂閱什么主題的消息,那么暴露一個(gè)如下特殊HTTP路由地址的接口:

GET http://localhost:<appPort>/dapr/subscribe

并返回如下格式的響應(yīng)體,讓Dapr知道你的應(yīng)用需要訂閱什么主題,接收消息的接口路由地址是什么:

[
{
"pubsubname": "pubsub",
"topic": "newOrder",
"route": "/orders"
}
]

當(dāng)然你的應(yīng)用需要暴露注冊(cè)的接收路由接口,并支持POST謂詞,接口收到請(qǐng)求后返回2xx狀態(tài)碼告訴Dapr消息處理成功了,或者404告訴Dapr出現(xiàn)錯(cuò)誤且消息已經(jīng)丟棄,或者其他狀態(tài)碼讓Dapr進(jìn)行重試。

兩種訂閱注冊(cè)方式各有優(yōu)缺,聲明式方便一個(gè)主題注冊(cè)多個(gè)應(yīng)用,編程式方便一個(gè)應(yīng)用注冊(cè)多個(gè)主題。

注意:如果是使用gRPC來(lái)注冊(cè)和暴露消費(fèi)接口,那么規(guī)范有所不同,做法見(jiàn)下面。

DOTNET SDK

Dapr的.NET SDK同樣針對(duì)消息生產(chǎn)端和消費(fèi)端提供相關(guān)的函數(shù)庫(kù)。

在DaprClient這個(gè)客戶端類中提供了 PublishEventAsync 這個(gè)方法來(lái)用于發(fā)送事件消息到特定PubSub的特定主題上 (底層是請(qǐng)求了gRPC的接口)。比如:

using var client = new DaprClientBuilder().Build();

var eventData = new { Id = "17", Amount = 10m, };
await client.PublishEventAsync(pubsubName, "deposit", eventData, cancellationToken);

在消費(fèi)端,目前針對(duì)ASP.NET Core提供了一個(gè)特殊的屬性標(biāo)記 TopicAttribute 來(lái)簡(jiǎn)化編程式訂閱注冊(cè)的過(guò)程。比如:

[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)

如果你是使用gRPC來(lái)實(shí)現(xiàn)消費(fèi)端,那么目前并沒(méi)有一個(gè)簡(jiǎn)化方式來(lái)注冊(cè)(未來(lái)我會(huì)補(bǔ)上這個(gè)坑),只能遵循如下規(guī)范:

首先用ListTopicSubscriptions注冊(cè):

public override Task<ListTopicSubscriptionsResponse> ListTopicSubscriptions(Empty request, ServerCallContext context)
{
var result = new ListTopicSubscriptionsResponse();
result.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "deposit"
});
result.Subscriptions.Add(new TopicSubscription
{
PubsubName = "pubsub",
Topic = "withdraw"
});
return Task.FromResult(result);
}

接著用OnTopicEvent接收:

public override async Task<TopicEventResponse> OnTopicEvent(TopicEventRequest request, ServerCallContext context)
{
if (request.PubsubName == "pubsub")
{
var input = JsonSerializer.Deserialize<Models.Transaction>(request.Data.ToStringUtf8(), this.jsonOptions);
var transaction = new GrpcServiceSample.Generated.Transaction() { Id = input.Id, Amount = (int)input.Amount, };
if (request.Topic == "deposit")
{
await Deposit(transaction, context);
}
else
{
await Withdraw(transaction, context);
}
}

return await Task.FromResult(default(TopicEventResponse));
}

具體見(jiàn)SDK的examples:https://github.com/heavenwing/dapr-dotnet-sdk/blob/master/examples/AspNetCore/GrpcServiceSample/Services/BankingService.cs

用法與例子

使用SDK來(lái)進(jìn)行事件消息的發(fā)布和訂閱,可以直接參考SDK的examples中的消費(fèi)端例子 ControllerSample 和生產(chǎn)端例子 PublishSubscribe。

如果是非SDK的用法,可以參考我的quickstarts,消費(fèi)端 PubSubConsumer和生產(chǎn)端 PubSubProducer。

我的quickstarts的消費(fèi)端同時(shí)使用聲明式和編程式兩種注冊(cè)方式。大致代碼如下:

[Route("dapr/subscribe")]
[ApiController]
public class DaprSubscribeController : ControllerBase
{
public ActionResult<DaprSubscribeOutput[]> Get()
{
return Ok(new DaprSubscribeOutput[]
{
new DaprSubscribeOutput
{
PubSubName="pubsub",
Topic="quickstarts/wakeup",
Route="/api/wakeup"
}
});
}
}

public async Task<IActionResult> PostAsync(TinyCloudEvent<MessageInput> model)
{
_logger.LogInformation(model.Data.Name);
return Ok();
}

using var httpClient = new HttpClient();
httpClient.BaseAddress = new Uri(pubsubUrl);

Console.WriteLine($"To {topicName1} ...");
var request1 = new HttpRequestMessage(HttpMethod.Post, topicName1);
request1.Content = new StringContent(JsonSerializer.Serialize(new { name = "Jack" }), Encoding.UTF8, "application/json");
await httpClient.SendAsync(request1);
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: quickstarts-subscription
spec:
topic: quickstarts/sleep
route: /api/sleep
pubsubname: pubsub
scopes:
- quickstarts-pbc

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多