通过领域事件和Azure服务总线实现最终的一致性

我打算通过本文说明什么最终一致性是 , 如何比较事务一致性并且 , 当考虑以这种方法设计的解决方案。接下来我们来看看事件域看他们的作用是 , 在最终的一致性 , 并且还关系到我们如何识别这些语言的域名。最后 , 基于一个简单的场景中 , 我们将对提出的体系结构的实现细节 , 使用Azure 服务总线消息收发系统。

最终交易 VS

如名称所暗示 , 最终浓度约为一致 , 最终。在此上下文中意味着最终在稍后的时间。为了理解该概念 , 当比较事务一致性 , 如在以下附图中 :

通过领域事件和Azure服务总线实现最终的一致性

第 1 图的工作 , 也许是大多数开发人员非常熟悉。客户端 ( UI 、 API 等) 执行的指令的系统的内部运行的所有必要操作 , 以保持一致性域 (a 和 b) 在这里交易。当客户端获得响应的 :

  • 两个操作 A 和操作 B 成功
  • 两个操作 A 和操作 B 失效了

因此 , 当在工作流事务一致性的客户得到一个 “成功” 响应 , 保证了所有必要的操作来维持该一致性域已经成功执行。

必须注意 , 一致性域的 “边界” 并不是一个解决方案 , 而是业务组件边界定义。报告列举了 “空间” 内的域的集合的业务规则需要强制施行。

通过领域事件和Azure服务总线实现最终的一致性

在第二工作流程的图形可能更熟悉分布式系统的开发人员。再次 , 该客户机执行一个命令的系统 , 但是这个时间只有一部分的所有必要操作的一致性维护运行 (此处内部) 的交易。现在 , 当客户端获得响应 , 则表示以下之一是真实 :

  • 操作 A 和操作 B 已经成功运行计划在稍后时间 , 即最终
  • 操作 A 和操作 B 失败 (will not run)

换句话说 , 当在最终一致性的工作流客户端得到一个 “成功” 响应 , 它只保证一部分必要的操作以便维持该一致性域已被成功执行和调度运行。

可以说 , 通过比较事务一致性似乎更直接和更容易实现。事实上 , 在大多数嵌入式架构。那么 , 为什么当选择与最终一致性 ?

再看第一个图 , 以下场景 :

  1. 作业 B 需要很长的时间来执行。
    例如 , 设定大的值的计算、报告生成等。您不想让您网站的用户等待几秒钟 , 直到它们获得确认的帐 , 所以你需要这些长时间运行的操作 , 以在背景中运行 , 从而优化了前端性能。
  2. B 操作在本质上是异步的 , 即取决于异步机制。
    例如 , 发送电子邮件。这是一个经典的例子。你将很快向用户通知电子邮件将被发送 “不久” 为什么不在这一单独的组件 , 其还可以被重用和独立缩放。
  3. 执行操作 B 的不同骨料但不是在相同的操作限界上下文
    我稍后将讨论这种情形 , 并同时实现细节。
  4. 执行操作 B 的不同限界上下文比操作的。
    基于 Martin Fowler 的示例 , 考虑更新客户的名称销售的上下文将需要升级的客户的支持。这种事不会发生在相同的时间 , 但最终有界上下文将被同步 , 从而实现时域一致性。

在现实应用场景 # 1 将几乎肯定会迫使你朝最终一致性在一些点由于性能要求。甚至场景 # 2 中 , 虽然这无可厚非是设计决策的部分 , 因为该响应时间的异步 API 发起机制可以为快速路由所述交易。

原因 # 3 和 # 4 的 DDD 和特定主要做出明智的设计决策。你很可能会将该交易路由到最终一致性选择在这些情景下更好的设计。这促进了较小的、低耦合的组件执行特定操作。这将使该系统更容易扩展 , 而将默认地导致改进的性能的应用程序的增长 , 因为你很可能遇到场景 # 1 。

事件域

第二个图又不清楚如何操作 “B” 被触发。事件域在这里诞生。考虑更新的示意图 :

通过领域事件和Azure服务总线实现最终的一致性
 
这提供了更多细节 , 我们可以看到一个事件是 , 域出版后不久 , “操作” 的运行 , 同时技术的边界内 , 用一交易。域的事件随后被保存在队列中消耗的过程 , 以后谁还需要它来运行操作 “B” 。

因此 , 一个域事件中的作用是便于最终一致性。在这方面 , 他们作为触发器的域信息和容器。

事件域没有正式定义为 DDD 模式 (Eric Evans本书首次发布。概念引入后来并定义为 :

事件域是一个正式的域模型的表示的道理。域活动 , 而忽略无关作出明确的事件 , 来自不同领域的专家要跟踪或通知的、或相关联的状态改变对象的其他模型。

事件域也可以用于重建的特定状态 ;事件来源采用。在这个特定的上下文命令将创建的每个客户事件域。

但这并不适合于 “传统” 方法 , 只有 ‘有意义’ 事件是捕获域的域模型。那么 , 什么是 “有意义” ?一个想法是 : “如果我忽略了的事件 X 发生的业务域的规则仍然是一致的吗 ?” 如果答案是不 , 那么你可能需要它来作为事件域模型。

作为提示 , 在试图识别域事件 ,Vernon Vaughn还建议关注以下关键词组为业务专家谈话时 :

  • “当。 … …”
  • “如果出现这种情况”
  • “告诉我。 … …” 和 “通知我。 … …”
  • “发生”

我还应提及 , 您可以使用事件域同步更新域内部的对象相同。我个人觉得这是一个稍微过度工程化实现事务一致性 , 但它是一种有效的方法。

另一方法是使用同步事件来更新另一个域内) 的相同事务处理。从技术上讲这是错误的 , 因为它违背了聚集体每交易单DDD 规则 , 但是它可以应用在当没有可用的消息接发机制。这样做仍然可以获得一些益处的解耦设计的聚集体 , 但是不更新之间的性能和扩展性优势。

在以下实现中焦点将放在 “最终一致性” 情景 , 完全利用域事件所带来的好处。

提出的设计

让我们考虑下面的方法的简化版本 (Vernon Vaughn 的办法在他的畅销书执行 DDD) :通过领域事件和Azure服务总线实现最终的一致性
首先 , 一个域内聚集事件边界。本实体负责的领域或服务。

然后 , 将新近创建的域是通过发布事件事件发布者。当这发生时 , 现有注册用户 , 在这种情况下 ,事件存储订户将接收的事件 , 并将其保存。事情至此内发生的相同交易。

接下来的事件代理(后台处理) 将顺序读取事件并且将它们转发到专用消息队列

最后各种事件使用者(后台处理) 将读取的专用队列、串行化并运行必要的操作来实现时域一致性。

有几个重要的事情要提一下这种设计方案。首先 , 你可能发现该事件不直接进入消息队列但被首先存储在模型存储队列作为用于事件转发器。这种情况的原因是基于这样的假设 , 该模型存储在消息队列并不共享同一交易 , 当真正使用 SQL Azure 服务总线和数据库消息传递系统。没有这个中间步骤 , 我们最终会在位置成功地提交事务 , 并且然后存储在模型中不能保存在事件域中的消息队列 , 这将留给我们一个不一致事件 , 因为模型中丢失。

同样的事情不会发生的事件和消息队列的货代吗 ?技术上是的 , 但是现在我们更好地处理这样的情形。因为域是存储事件如今我们可以尝试着给它的消息队列 , 并且仅如果成功我们会将该事件标记为 “已转发” 中存储的模型。

如果该事件被转发到消息队列中存储更新的模型 , 但是失败了怎么办 ?在此场景中 , 我们可以依靠两个东西 :

  1. 消息De - duplication。消息传送系统比如兔或 MQ Azure 服务总线提供此功能。自动去除重复消息基于唯一消息标识符的定制。
  2. 幂次事件域。这意味着 , 如果在相同域中只出现一次的事件 , 该事件后续将不改变的状态设定由第一域中发生。事件域具有幂等理想的场景 , 但它需要额外的工作 , 有时还可导致增加的复杂性和性能下降。因此 , 仔细选择消息传送系统 , 该系统支持去重总是一个好主意。

实现

现在 , 我们概念上澄清事情将如何工作的代码。在这里可以找到完整的解决方案 :https://github.com/florindpreda/eventualconsistency.domainevents.azureservicebus

场景选择说明了最终一致性的使用是相同的 , 当我说工作单元图案。事实上 , 我还会重用相同的工作单元中执行的事务的一部分。

快速重新迭代 , 我们有两个类别 : 产品和产品评论。— — 删除了相关产品的产品评论需要被删除的删除 (这里为 “软逻辑” 或 “删除” , 而不是物理地移除数据库中的记录所使用的 , “标记” 更新的 isDeleted 代替) 。

首先我们说domainevent类别 :

1
2
3
4
5
6
7
8
9
public abstract class DomainEvent
{
    public DateTime OcurrendOn { get; protected set; }
 
    public DomainEvent()
    {
        this.OcurrendOn = DateTime.UtcNow;
    }
}

这是所有将来的事件。这是个好主意 , 所有事件具有时间戳的发生 , 主要用于伐木和调试方案。

接下来我们具体productdeleted事件 :

1
2
3
4
5
6
7
8
9
public class ProductDeleted : DomainEvent
{
    public Guid ProductId { get; protected set; }      
 
    public ProductDeleted(Guid productId)
    {
        this.ProductId = productId;        
    }
}

域事件应当仅承载所需的最少量信息的消费者的最佳性能。例如 , 如果事件需要五个消费者 ID 以便运行 , 就足以找到其他 4 , 但它需要复杂和耗时的查询 , 那么最好还是包括所有五个主体中的事件 ID 的事件时容易获得。在这种情况下 , 为了删除所有商品评论所有我们需要的是ProductID

如所讨论的 , 该事件将被创建并发布来自产品集合 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Product : Entity
{      
    public string Name { get; private set; }
 
    protected Product() {}
 
    public Product(string name)
    {      
        this.Name = name;
    }
 
    public override void Delete()
    {
        base.Delete();
        DomainEvents.Publisher.Publish<ProductDeleted>(new ProductDeleted(this.Id));         
    }
}

让我们看看如何像出版商 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class DomainEventPublisher
{
    private readonly IDictionary<Type, IList<IDomainEventSubscriber>> _subscribers = new Dictionary<Type, IList<IDomainEventSubscriber>>();
 
    public void Publish<T>(T domainEvent) where T : DomainEvent
    {          
        var eventSubscribers = _subscribers.SelectMany(s => s.Value)
                                            .Where(sb => sb.SubscribedToEventType() == domainEvent.GetType()
                                                        || sb.SubscribedToEventType() == typeof(DomainEvent)
                                                    );
 
        foreach(var eventSubscriber in eventSubscribers)
        {
            eventSubscriber.Handle(domainEvent);
        }
    }      
 
    public void Subscribe<TEvent>(Action<DomainEvent> handle) where TEvent : DomainEvent
    {
        var subscriber = new DomainEventSubscriber(handle, typeof(TEvent));
        Subscribe(subscriber);
    }
 
    public void Subscribe(IDomainEventSubscriber domainEventSubscriber)
    {
        var eventType = domainEventSubscriber.SubscribedToEventType();         
        if (_subscribers.ContainsKey(eventType))
        {
            _subscribers[eventType].Add(domainEventSubscriber);
        }
        else
        {
            _subscribers[eventType] = new List<IDomainEventSubscriber>();
            _subscribers[eventType].Add(domainEventSubscriber);
        }
    }      
}

首先需要注意的是 , 出版商的订户列表分组的事件的类型是 “听” 到。当一个事件被公布 , 所有注册用户 , 并执行各自的处理。订户可以登记具体事件 , 比如productdeleted或者 , 所有的事件。在eventstoringsubscriber我们希望是后者 , 所有事件被存储用于将来转发 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EventStoringSubscriber : IEventStoringSubscriber
{      
    private readonly IStoredEventRepository _storedEventRepository;
    private readonly IEventSerializer _eventSerializer;
 
    public EventStoringSubscriber(IStoredEventRepository storedEventRepository, IEventSerializer eventSerializer)
    {          
        _storedEventRepository = storedEventRepository;
        _eventSerializer = eventSerializer;
    }
 
    public void Handle(DomainEvent domainEvent)
    {
        var serializedBody = _eventSerializer.Serialize(domainEvent);
        var storedEvent = new StoredEvent(domainEvent.GetType().ToString(), domainEvent.OcurrendOn, serializedBody);
        _storedEventRepository.Add(storedEvent);
    }
 
    public Type SubscribedToEventType()
    {
        return typeof(DomainEvent);
    }
}

eventstoringsubscriberJSON 序列化的事件 ( 在这里 ) , 创造了storedevent然后将其保持。再次 , 重要的是要提到 ,storedeventrepository操作在相同事务下运行范围的存储库删除时涉及的产品。在storedevent域模型是一个类提供公共接口 , 用于存储所有事件以均匀的方式 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class StoredEvent : Entity
{
    public string TypeName { get; private set; }
    public DateTime OccurredOn { get; private set; }
    public string SerializedBody { get; private set; }
    public bool IsForwarded { get; private set; }
 
    protected StoredEvent() {}
 
    public StoredEvent(string typeName, DateTime occurredOn, string serializedBody)
    {
        TypeName = typeName;
        OccurredOn = occurredOn;
        SerializedBody = serializedBody;
    }
 
    public void MarkAsForwarded()
    {
        IsForwarded = true;
    }
}

The last piece of the information of the publishing part is to see how and when are the subscribers registered. First it’s important to note that the DomainEventPublisher is a ‘singleton per request’, meaning we’ll use the same instance during the duration of each request. It’s the same approach used for the Unit of Work. This is the implementation using Unity IoC:

1
2
3
4
5
6
7
8
9
10
if (HttpContext.Current != null)
{
    container.RegisterType<IDatabaseContext, DatabaseContext>(new PerHttpRequestLifetimeManager());
    container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new PerHttpRequestLifetimeManager());
}
else
{              
    container.RegisterType<IDatabaseContext, DatabaseContext>(new ContainerControlledLifetimeManager());
    container.RegisterType<DomainEventPublisher, DomainEventPublisher>(new ContainerControlledLifetimeManager());
}

As we’ll get a new publisher for each request, we will also have to register the subscribers at the beginning of each request:

1
2
3
4
protected void Application_BeginRequest()
{
    UnityConfig.RegisterEventsSubscribers();
}

The code above is part of Global.asax and it calls the following method which is part of the IoC configuration:

1
2
3
4
5
6
7
public static void RegisterEventsSubscribers()
{
    _container.Resolve<DomainEvents>();
 
    var eventStoringSubscriber = _container.Resolve<IEventStoringSubscriber>();          
    DomainEvents.Publisher.Subscribe(eventStoringSubscriber);
}

The DomainEvents is just a wrapper exposing the publisher via a static property. It’s implemented this way so it can be easily used inside entities without explicitly coupling them with an IDomainEventsPublisher interface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DomainEvents
{
    private static DomainEventPublisher _publisher;
 
    public static DomainEventPublisher Publisher
    {
        get
        {
            if (_publisher == null)
            {
                throw new Exception("Publisher is not initialized");
            }
            return _publisher;
        }
    }
 
    public DomainEvents(DomainEventPublisher publisher)
    {
        _publisher = publisher;
    }
}

This covers the publishing part. At this point the ProductDeleted event is stored in the storedeventsSQL 表。

接下来 , 在 “事件背景货运工人需要推动它在 Azure 服务总线队列。在eventforwarderservice阶级 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class EventForwarderService : IEventForwarderService
{      
    private readonly IDictionary<string, string> _eventTypeQueueMapping = new Dictionary<string, string>()
    {
        { typeof(ProductDeleted).ToString(), "ProductDeletedQueue" },
        { typeof(PlaceholderEvent).ToString(), "PlaceholderQueue" }
    };
 
    private readonly IUnitOfWork _unitOfWork;
    private readonly IStoredEventRepository _storedEventRepository;
    private readonly IMessagingService _messagingService;
 
    public EventForwarderService(IUnitOfWork unitOfWork, IStoredEventRepository storedEventRepository, IMessagingService messagingService)
    {
        _unitOfWork = unitOfWork;
        _storedEventRepository = storedEventRepository;
        _messagingService = messagingService;
    }
 
    public void ForwardEvents()
    {
        using(_unitOfWork)
        {
            var newEvents = _storedEventRepository.GetNewEvents().ToList();
             
            foreach(StoredEvent storedEvent in newEvents)
            {
                var queueName = this.GetAssociatedQueueName(storedEvent.TypeName);
                _messagingService.Send(storedEvent, queueName);
 
                storedEvent.MarkAsForwarded();
                _storedEventRepository.Update(storedEvent);
 
                _unitOfWork.Commit();
            }              
        }
    }
 
    private string GetAssociatedQueueName(string eventType)
    {
        var queueName = string.Empty;
 
        try
        {
            queueName = _eventTypeQueueMapping[eventType];
        }
        catch(KeyNotFoundException ex)
        {
            throw new ArgumentOutOfRangeException(string.Format("No mapping defined for event: {0}", eventType), ex);
        }
 
        return queueName;
    }
}

 

因为上面的类封装了所有的 “转发” 逻辑可以被容易地再利用 , 不管什么类型的服务的事件背景是 “转发器” 。我们可以毫不费力的 Azure Web 任务之间进行切换 , 工人角色或 Windows 服务。在imessagingservice接口实现azureservicebusqueuemessagingservice类别 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class AzureServiceBusQueueMessagingService : IMessagingService
{      
    private readonly NamespaceManager _namespaceManager;
    private readonly string _connectionString;
 
    private readonly IEventSerializer _eventSerializer;
 
    public AzureServiceBusQueueMessagingService(IEventSerializer eventSerializer)
    {
        _connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
        _namespaceManager = NamespaceManager.CreateFromConnectionString(_connectionString);        
        _eventSerializer = eventSerializer;
    }
 
    private void InitQueue(string queueName)
    {
        if(string.IsNullOrWhiteSpace(queueName))
        {
            throw new ArgumentNullException("Queue name is empty.");
        }
 
        //configure queue settings
        var queueDescription = new QueueDescription(queueName);
        queueDescription.RequiresDuplicateDetection = true;
        queueDescription.DuplicateDetectionHistoryTimeWindow = TimeSpan.FromDays(7);
        queueDescription.LockDuration = TimeSpan.FromMinutes(5);
        queueDescription.EnableDeadLetteringOnMessageExpiration = true;
 
        //create queue if not exists
        if (!_namespaceManager.QueueExists(queueName))
        {
            _namespaceManager.CreateQueue(queueDescription);
        }
    }
 
    public void Send(StoredEvent storedEvent, string queueName)
    {
        this.InitQueue(queueName);
 
        var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);         
        var brokeredMessage = this.CreateBrokeredMessage(storedEvent);
 
        client.Send(brokeredMessage);
 
        client.Close();
    }
 
    private BrokeredMessage CreateBrokeredMessage(StoredEvent storedEvent)
    {
        var brokeredMessage = new BrokeredMessage(storedEvent.SerializedBody);
        brokeredMessage.MessageId = storedEvent.Id.ToString();
 
        return brokeredMessage;
    }
 
    ...
}

 

一个值得注意的initqueue方法是 , 我们设定requiresduplicatedetection标志为真 , 并且还在duplicatedetectionhistorytimewindow。前者允许重复检测 , 后者是多久 Azure 服务总线将存储MessageID所存储信息。我在这里设置为 7 天 , 但这个时间窗口 , 应谨慎考虑 , 因为它需要额外的队列空间。

拼图的最后一块是消费的 “事件” 。就像在 “事件背景是货运代理人这一过程 , 但不同的是 , 我们可以有多个消费者 (每一个事件类型) 。在productdeletedeventconsumer仅接收productdeleted事件 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ProductDeletedEventConsumer : IProductDeletedEventConsumer
{
    private readonly string QUEUE_NAME = "ProductDeletedQueue";
    private readonly IMessagingService _messagingService;
    private readonly IUnitOfWork _unitOfWork;
    private readonly IProductReviewRepository _productReviewRepository;
 
    public ProductDeletedEventConsumer(IMessagingService messagingService, IUnitOfWork unitOfWork, IProductReviewRepository productReviewRepository)
    {
        _messagingService = messagingService;
        _unitOfWork = unitOfWork;
        _productReviewRepository = productReviewRepository;
    }
 
    public void ProcessNextEvent()
    {
        _messagingService.ProcessNextEvent<ProductDeleted>(pd => Process(pd), QUEUE_NAME);
    }
 
    private void Process(ProductDeleted productDeleted)
    {
        using(_unitOfWork)
        {
            var productReviews = _productReviewRepository.GetByProductId(productDeleted.ProductId);
            foreach (var productReview in productReviews)
            {
                productReview.Delete();
                _productReviewRepository.Update(productReview);
            }
 
            _unitOfWork.Commit();
        }
    }
}

每一个新时间productdeleted发生时 ,productdeletedeventconsumer该过程将通过删除所有相关评论。在processnextevent方法azureservicebusqueuemessagingservice如下所示 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
...continued
 
public void ProcessNextEvent<TEvent>(Action<TEvent> handle, string queueName) where TEvent : DomainEvent
{
    this.InitQueue(queueName);
 
    var client = QueueClient.CreateFromConnectionString(_connectionString, queueName);
 
    var brokeredMessage = client.Receive(TimeSpan.FromSeconds(5));
 
    if (brokeredMessage != null)
    {
        Process<TEvent>(handle, brokeredMessage);
    }
}
 
private void Process<TEvent>(Action<TEvent> handle, BrokeredMessage brokeredMessage) where TEvent : DomainEvent
{
    var jsonEvent = brokeredMessage.GetBody<string>();
    var productDeletedEvent = _eventSerializer.Deserialize<TEvent>(jsonEvent);
 
    handle(productDeletedEvent);
 
    try
    {              
        brokeredMessage.Complete();
    }
    catch (Exception ex)
    {
        //do something else, e.g log
        brokeredMessage.DeadLetter();//move to dead letter queue to inspect later  
    }
}

 

运行它

  1. 到了Azure 门户并创建一个新的 Azure 服务总线命名空间。您可以创建一个免费帐户吧 , 如果你还没有的话 ) 。
  2. 溶液从 GitHub 来下载 :https://github.com/florindpreda/eventualconsistency.domainevents.azureservicebus
  3. 更新所述服务总线连接的字符串在 app. configevcosample.eventforwarderworkerevcosample.productdeletedconsumerworker
  4. 运行 Web 项目evcosample.api( 这将创建一个数据库 LocalDB 和种子) 。
  5. 同时运行的后台进程evcosample.eventforwarderworkerevcosample.productdeletedconsumerworker
  6. 发送 HTTP DELETE 请求productcontroller.deleteWeb API 方法传递 (ProductID) 被删除。
  7. 检查产品productreviewsstoredeventsSQL 表是否productdeleted事件已经被成功处理。

 

结论

总之 , 我们已经看到 :

  • “最终一致性” 方法是一种设计 , 可以提高性能和可扩展性的应用的执行的某些操作推迟到稍后的时间。
  • 在事件域 DDD 的战术元素和 “最终一致性” 的主持人也作为触发事件信息和容器的消费者域。
  • 一般的模式实现 “最终一致性” 是使用消息传递系统像 Azure 服务总线存储序列化事件域
  • 当存储模型和消息传递系统不共享同一事务范围 , 事件应当被初始地保存在模型存储并且然后转发到消息收发系统。这确保不会丢失域事件何时出现故障。
  • 为了避免相同事件域处理一次以上 , 使去重复的消息传递系统的支持 , 并尝试设计等领域。