在之前的一篇博文中,CAP框架可以方便我们实现非实时、异步场景下的最终一致性,而有些用例总是无法避免的需要在实时、同步场景下进行,可以借助Saga事务来解决这一困扰。在一些博文和仓库中也搜寻到了.Net下实现Saga模式的解决方案MassTransit,这就省得自己再造轮子了。
分布式系统中,分布式事务是一个不能避免的问题,如何保证不同节点间的数据一致性。举个常见的例子,下订单、减库存、扣余额,三者在单个节点时,可以借助本地事务,实现要么成功要么失败。而当三者处于不同节点时,又参杂了如网络环境、节点自身环境、服务环境等各种因素,使得三个节点想要实现要么成功、要么失败就增加了许多困难。
CAP理论和BASE理论很好的诠释了这一问题,也有了许多的解决分布式事务的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解决方案,面对不同场景、不同要求等可选择不同的解决方案。
在之前提到过一个基于本地消息表的CAP框架,借助最终一致性很方便的解决了异步非实时请求下的分布式事务,而对于大部分场景虽然可以直接或者妥协方式使用着异步非实时,如同步实时场景的下订单且减库存变更到异步非实时场景的下订单后发事件减库存,但是总有那么一些场景,不得不去考虑同步实时请求下的分布式事务。
Saga模式又叫做长时间运行事务(Long-running-transaction), 由普林斯顿大学的 Hector Garcia-Molina和Kenneth Salem 1987年发表的论文《Sagas》。核心思想是将长事务拆分为多个本地短事务,通过保证所有短事务的成功或失败来决定整体的成功或失败,由Saga事务协调器协调管理,所有节点执行成功,则成功,如有节点失败,则反向执行前置节点的补偿操作。
当正常执行时,依照T1、T2、T3三个短事务正常执行下去,直到最后一个Tn事务执行完毕,宣告整个事务的成功。
而当执行到某个Tj出现故障时,则反向补偿之前的Tj-1..T1,每个对应的补偿操作Cj-1...C1,其中Tj事务由于在执行阶段就已失败,所以Tj对应的补偿动作Cj不需要执行,即也确定了最后一个Tn事务可以不设置补偿动作Cn。
对于服务与服务间的协作,我们通常有两种模式:Orchestration(编排式) 和 Choreography(协同式),在Saga模式中也有着这两种的实现。
编排式与协同式的差异仅在于服务之间的协作方式,每个参与服务的接口定义却没有任何区别。
编排式的 Saga 需要开发人员定义一个编排器类,用于编排一个Saga中多个参与服务执行的流程。如果整个业务流程正常结束,业务就成功完成,一旦这个过程的任何环节出现失败,Saga编排器类就会以相反的顺序调用补偿操作,重新进行业务回滚。
对于每个参与的服务而言,需要做的事情是
以提交订单为例,假设场景是分布式系统下,进程间以消息传递进行通信:
1、事务发起方的主业务逻辑请求预先定义好的Saga编排器类(内部编排了执行顺序)。
2、Saga编排器类向MQ发送减库存事件,库存服务订阅事件、执行处理并返回MQ处理结果。
3、Saga编排器类向MQ发送减余额事件,支付服务订阅事件、执行处理并返回MQ处理结果。
4、Saga编排器类向MQ发送创建订单命令,订单服务订阅事件并按照命令创建订单。
5、主业务逻辑接收并处理Saga编排器类处理结果。
6、整个过程由Saga 编排器类对接收到的回复进行判决,来决定是继续执行还是悬崖勒马。
没有集中式的编排类,而是各参与方间相互订阅,一个服务订阅另一个服务的事件。
先由事务发起方执行逻辑并发布一个事件,该事件被一个或多个服务进行订阅,这些服务执行本地数据库操作并发布(或不发布)新的事件,该部分需要保证本地数据库的操作成功且写入MQ的消息也成功,可考虑使用本地消息表或是基于MQ事务。当最后一个服务执行本地事务并且不发布任何事件或者发布的事件没有被任何Saga参与者订阅意味着事务结束,则整个业务流程的分布式事务完成。如果某一服务出现故障,那么则反向发布事件,执行补偿操作,以此回滚。
以提交订单为例,假设场景是分布式系统下,进程间以消息传递进行通信:
1、事务发起方执行主业务逻辑发送提交订单命令。
2、库存服务订阅事件、扣减库存并发布已扣减事件。
3、订单服务订阅库存已扣减事件,创建订单并发布订单已创建事件。
4、支付服务订阅订单已创建事件,执行支付并发布订单已支付事件。
5、主业务逻辑订阅订单已支付事件并处理。
当某服务内执行时如存在异常,则反向发布事件,如订单创建失败,则发布OrderCreatedFailed事件,库存服务订阅该事件并执行补偿操作。
相比而言,编排式中参与服务无需向协同式中订阅上游服务的事件,减少了服务间对事件协议的依赖,而只需要关心集权的编排器类发送的消息。
当开启一个事务前,需要做一些准备,准备一个事务Id,记录整个事务执行情况,各Tj事务执行情况,当前请求上下文参数,入参参数记录等,以方便执行补偿操作时需要用到。如当Tj事务执行失败时,需要对Cj-1到C1执行补偿操作,此时各补偿操作需要一些正向执行T1,Tj-1的请求参数或执行结果,因此都需要记录下来。
在Courier中,通过Routing Slip来完成这些记录,创建一个Guid,记录请求上下文参数信息,可以绑定几个内置事件,在各阶段到来时会发送事件,如有需要可以订阅。
var builder = new RoutingSlipBuilder(NewId.NextGuid()); builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed); builder.AddVariable("RequestId", context.RequestId); builder.AddVariable("ResponseAddress", context.ResponseAddress); builder.AddVariable("FaultAddress", context.FaultAddress); builder.AddVariable("Request", context.Message); //组合一系列Activity var routingSlip = builder.Build(); await context.Execute(routingSlip).ConfigureAwait(false);
弄了个Demo,建立了三个服务,此处我使用编排式来完成,但无论是选用编排式还是协同式,都借助RabbitMQ实现消息传递。
每个服务都安装了MassTransit相关的包
MassTransit.AspNetCore MassTransit.RabbitMQ
将Saga编排器类放置在OrderService中了,对于编排器类的放置,个人认为是应该看用例的主服务是谁而放置,想过放在BFF去协调三个服务,但是总是感觉不是BFF的职责范围。
在各服务中对MassTransit配置,如下在OrderService中对MassTransit需要使用到的RabbitMQ配置,对需要进行多个服务协作的用例配置Routing Slip,对消息队列侦听订阅需要的事件并配置相应的Activity处理。
services.AddMassTransit(x => { var currentAssembly = Assembly.GetExecutingAssembly(); x.AddActivities(currentAssembly); x.AddConsumers(currentAssembly); x.AddRequestClient<createordercommand>(); x.UsingRabbitMq((context, cfg) => { // 配置RabbitMQ cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h => { h.Username(Configuration["RabbitmqConfig:Username"]); h.Password(Configuration["RabbitmqConfig:Password"]); }); //配置Routing Slip cfg.ReceiveEndpoint("CreateOrderCommand", ep => { ep.ConfigureConsumer<createorderrequestproxy>(context); ep.ConfigureConsumer<createorderresponseproxy>(context); }); // 配置订阅队列及Handler处理 cfg.ReceiveEndpoint("CreateOrder_execute", ep => { ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context); }); }); }); services.AddMassTransitHostedService();
构建Routing Slip,此处依据用例的需求,对需要协作的服务编排,组合一系列的Activity。
Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request) { builder.AddActivity("ReduceStock", new Uri("..."), new {}); builder.AddActivity("DeductBalance", new Uri("..."), new {}); builder.AddActivity("CreateOrder", new Uri("..."), new { }); return Task.CompletedTask; }
当请求进入后,通过RequestClient发送CreateOrderCommand,同步等待执行结果,再由编排器类负责协调预设好的Activity,发送事件到消息队列,经各Activity订阅处理最终返回结果。
[Route("[controller]")] public class OrderController : ControllerBase { private readonly IRequestClient<createordercommand> _createOrderClient; public OrderController(IRequestClient<createordercommand> createOrderClient) { _createOrderClient = createOrderClient; } [HttpGet("CreateOrder")] public async Task<commoncommandresponse<createorderresult>> CreateOrder() { var result = await _createOrderClient.GetResponse <commoncommandresponse<createorderresult>>(new CreateOrderCommand() { // ... }); return result.Message; } }
各服务中对于Activity设置侦听队列以及请求信息,调用Execute执行逻辑,当出现异常时返回到MQ通知编排器类,在对之前执行的Activity执行Compensate。如在CreateOrderActivity中执行异常,由编排器类执行补偿,ReduceStockActivity调用Compensate,执行增加库存逻辑
public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog> { public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context) { var argument = context.Arguments; // 扣减库存 await Task.Delay(100); return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 }); } public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context) { // 增加库存 await Task.Delay(100); return context.Compensated(); } }
用例请求执行后,先由Controller发送请求,再由库存服务扣减库存,支付服务扣减余额,最后由订单服务创建订单,当创建失败时,执行补偿操作,库存服务增加库存,支付服务增加余额。
用例请求执行后,先由Controller发送请求,再由库存服务扣减库存,支付服务扣减余额,最后由订单服务创建订单,当创建失败时,执行补偿操作,库存服务增加库存,支付服务增加余额。
在整个事务失败后,先会返回异常,再由编排器执行补偿操作,实现最终的数据一致性。MassTransit也提供了重试机制以实现向前恢复,避免因数据库连接超时、网络波动等问题造成的失败。
到此这篇关于AspNetCore&MassTransit Courier实现分布式事务的文章就介绍到这了,更多相关AspNetCore分布式事务内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!