MassTransit是一款优秀的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的事件总线。
A free, open-source distributed application framework for .NET.
一个免费、开源的.NET 分布式应用框架。-- MassTransit 官网
空口无凭,创建一个项目快速体验一下。
worker
模板创建一个基础项目:dotnet new worker -n MassTransit.Demo
MassTransit
using System; namespace MassTransit.Demo { public record OrderCreatedEvent { public Guid OrderId { get; set; } } }
4.修改Worker
类,发送订单创建事件:
namespace MassTransit.Demo; public class Worker : BackgroundService { readonly IBus _bus;//注册总线 public Worker(IBus bus) { _bus = bus; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { //模拟并发送订单创建事件 await _bus.Publish(new OrderCreatedEvent(Guid.NewGuid()), stoppingToken); await Task.Delay(1000, stoppingToken); } } }
5.仅需实现IConsumer<OrderCreatedEvent>
泛型接口,即可实现消息的订阅:
public class OrderCreatedEventConsumer: IConsumer<OrderCreatedEvent> { private readonly ILogger<OrderCreatedEventConsumer> _logger; public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger) { _logger = logger; } public Task Consume(ConsumeContext<OrderCreatedEvent> context) { _logger.LogInformation($"Received Order:{context.Message.OrderId}"); return Task.CompletedTask; } }
6.注册服务:
using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); services.AddMassTransit(configurator => { //注册消费者 configurator.AddConsumer<OrderCreatedEventConsumer>(); //使用基于内存的消息路由传输 configurator.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();
7.运行项目,一个简单的进程内事件发布订阅的应用就完成了。
如果需要使用RabbitMQ 消息代理进行消息传输,则仅需安装MassTransit.RabbitMQ
NuGet包,然后指定使用RabbitMQ 传输消息即可。
using MassTransit; using MassTransit.Demo; IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); services.AddMassTransit(configurator => { configurator.AddConsumer<OrderCreatedEventConsumer>(); // configurator.UsingInMemory((context, cfg) => // { // cfg.ConfigureEndpoints(context); // }); configurator.UsingRabbitMq((context, cfg) => { cfg.Host( host: "localhost", port: 5672, virtualHost: "/", configure: hostConfig => { hostConfig.Username("guest"); hostConfig.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }) .Build(); await host.RunAsync();
运行项目,MassTransit会自动在指定的RabbitMQ上创建一个类型为fanout
的MassTransit.Demo.OrderCreatedEvent
Exchange和一个与OrderCreatedEvent
同名的队列进行消息传输,如下图所示。
MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:
从上图可知,本质上还是发布订阅模式的实现,接下来就核心概念进行详解。
Message:消息,可以使用class、interface、struct和record来创建,消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为。MassTransit使用的是包含命名空间的完全限定名即typeof(T).FullName
来表示特定的消息类型。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果。为避免此类情况,官方建议使用接口来定义消息。在MassTransit中,消息主要分为两种类型:
经过MassTransit发送的消息,会使用信封包装,包含一些附加信息,数据结构举例如下:
{ "messageId": "6c600000-873b-00ff-9a8f-08da8da85542", "requestId": null, "correlationId": null, "conversationId": "6c600000-873b-00ff-9526-08da8da85544", "initiatorId": null, "sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true", "destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent", "responseAddress": null, "faultAddress": null, "messageType": [ "urn:message:MassTransit.Demo:OrderCreatedEvent" ], "message": { "orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8" }, "expirationTime": null, "sentTime": "2022-09-03T12:32:15.0796943Z", "headers": {}, "host": { "machineName": "THINKPAD", "processName": "MassTransit.Demo", "processId": 24684, "assembly": "MassTransit.Demo", "assemblyVersion": "1.0.0.0", "frameworkVersion": "6.0.5", "massTransitVersion": "8.0.6.0", "operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0" } }
从以上消息实例中可以看出一个包装后的消息包含以下核心属性:
Producer,生产者,即用于生产消息。在MassTransit主要借助以下对象进行命令的发送和事件的发布。
从以上类图可以看出,消息的发送主要核心依赖于两个接口:
ISendEndpoint
:提供了Send
方法,用于发送命令。IPublishEndpoint
:提供了Publish
方法,用于发布事件。但基于上图的继承体系,可以看出通过IBus
、ISendEndpointProvider
和ConsumeContext
进行命令的发送;通过IBus
和IPublishEndpointProvider
进行事件的发布。具体举例如下:
1.通过IBus
发送:
private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //通过以下方式配置对应消息类型的目标地址 EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order")); await _bus.Send(request); }
2.通过ISendEndpointProvider
发送:
private readonly ISendEndpointProvider _sendEndpointProvider; public async Task Post(CreateOrderRequest request) { var serviceAddress = new Uri("queue:create-order"); var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress); await endpoint.Send(request); }
3.通过ConsumeContext
发送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest> { public async Task Consume(ConsumeContext<CreateOrderRequest> context) { //do something else var destinationAddress = new Uri("queue:lock-stock"); var command = new LockStockRequest(context.Message.OrderId); await context.Send<LockStockRequest>(destinationAddress, command); // 也可以通过获取`SendEndpoint`发送命令 // var endpoint = await context.GetSendEndpoint(destinationAddress); // await endpoint.Send<LockStockRequest>(command); } }
1.通过IBus
发布:
private readonly IBus _bus; public async Task Post(CreateOrderRequest request) { //do something await _bus.Publish(request); }
2.通过IPublishEndpoint
发布:
private readonly IPublishEndpoint _publishEndpoint; public async Task Post(CreateOrderRequest request) { //do something var order = CreateOrder(request); await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id)); }
3.通过ConsumeContext
发布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest> { public async Task Consume(ConsumeContext<CreateOrderRequest> context) { var order = CreateOrder(conext.Message); await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id)); } }
Consumer,消费者,即用于消费消息。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型。
无状态消费者,即消费者无状态,消息消费完毕,消费者就释放。主要的消费者类型有:IConsumer<TMessage>
、JobConsumer
、IActivity
和RoutingSlip
等。其中IConsumer<TMessage>
已经在上面的快速体验
部分举例说明。而JobConsumer<TMessage>
主要是对IConsumer<TMessage>
的补充,其主要应用场景在于执行耗时任务。
而对于IActivity
和RoutingSlip
则是MassTransit Courier
的核心对象,主要用于实现Saga模式的分布式事务。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip。而每个Activity的具体抽象就是IActivity
和IExecuteActivity
。二者的差别在于IActivity
定义了Execute
和Compensate
两个方法,而IExecuteActivitiy
仅定义了Execute
方法。其中Execute
代表正向操作,Compensate
代表反向补偿操作。用一个简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示。而对于具体实现,可参阅文章:AspNetCore&MassTransit Courier实现分布式事务
有状态消费者,即消费者有状态,其状态会持久化,代表的消费者类型为MassTransitStateMachine
。MassTransitStateMachine
是MassTransit Automatonymous
库定义的,Automatonymous
是一个.NET 状态机库,用于定义状态机,包括状态、事件和行为。MassTransitStateMachine
就是状态机的具体抽象,可以用其编排一系列事件来实现状态的流转,也可以用来实现Saga模式的分布式事务。并支持与EF Core和Dapper集成将状态持久化到关系型数据库,也支持将状态持久化到MongoDB、Redis等数据库。MassTransitStateMachine
对于Saga模式分布式事务的实现方式与RoutingSlip
不同,还是以简单的下单流程:创建订单->扣减库存->支付订单举例而言,其示意图如下所示。基于MassTransitStateMachine
实现分布式事务详参后续文章。
从上图可知,通过MassTransitStateMachine
可以将事件的执行顺序逻辑编排在一个集中的状态机中,通过发送命令和订阅事件来推动状态流转,而这也正是Saga编排模式的实现。
了解完MassTransit的核心概念,接下来再来看下MassTransit的核心特性以及应用场景:
总体而言,MassTransit是一款优秀的分布式应用框架,可作为分布式应用的消息总线,也可以用作单体应用的事件总线。感兴趣的朋友不妨一观。
到此这篇关于MassTransit 中的.NET 分布式应用框架的文章就介绍到这了,更多相关.NET 分布式应用框架内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!