Akka查询设备组《fourteen》译

加入以下依赖到项目中:

Akka查询设备组《fourteen》译

介绍:

到目前为止,我们看到的会话模式很简单,因为它们要求Actor保持很少或没有状态。

特别:

  • 设备actor返回一个读数,不需要更改状态。
  • 记录温度,更新单个字段。
  • 设备组Actors通过在map中添加或删除条目来维护组成员资格。

在这一部分中,我们将使用一个更复杂的例子。由于房主会对他们家中的温度感兴趣,我们的目标是能够查询一组中的所有设备Actors。我们首先来研究一下这样的查询API应该如何展现。

可能处理的情况:

我们面临的第一个问题是,一个群体的成员资格是动态的。每个传感器设备由一个可以随时停止的Actor表示。在查询开始时,我们可以向所有现有设备actor询问当前温度。但是,在查询的生命周期中:

  • 设备Actor可能会停止,无法通过温度读数进行响应。
  • 新的设备actor可能会启动并且不会包含在查询中,因为我们不知道它。

这些问题可以通过许多不同的方式解决,但重要的是要解决所期望的行为。以下适用于我们的用例:

  • 当查询到达时,组actor会获取现有设备actor的快照,并且只询问这些actor的温度。
  • 查询到达后启动的Actor将被忽略。
  • 如果快照中的actor在查询期间停止而没有应答,我们将向查询消息的发送者报告它停止的事实。

除了设备Actor动态来来往往,一些Actor可能需要很长时间才能回答。例如,它们可能会陷入意外的无限循环中,或者因为错误而失败并放弃我们的请求。我们不希望查询无限期地继续,因此我们将在以下任一情况下认为它是完整的:

  • 快照中的所有参与者都已响应或已确认已停止。
  • 我们达到了预定的截止日期。

鉴于这些决定,以及快照中的设备可能刚刚启动但尚未接收到记录温度的事实,我们可以针对温度查询为每个设备actor定义四种状态:

  • 它有一个可用的温度:Temperature。
  • 它已经响应,但还没有温度:TemperatureNotAvailable。
  • 在回答之前它已停止:DeviceNotAvailable。
  • 它没有在截止日期前回复:DeviceTimedOut。

在消息类型中总结这些,我们可以将以下内容添加到DeviceGroup:

Full source at GitHub

Akka查询设备组《fourteen》译

实现查询

实现查询的一种方法涉及将代码添加到组设备actor。然而,在实践中,这可能非常麻烦且容易出错。请记住,当我们开始查询时,我们需要拍摄存在的设备的快照并启动计时器,以便我们可以强制执行截止日期。与此同时,另一个查询可以到达。对于第二个查询,我们需要跟踪完全相同的信息,但与先前的查询隔离。这将要求我们在查询和设备角色之间维护单独的映射。

相反,我们将实施一种更简单,更优越的方法。我们将创建一个代表单个查询的actor,并代表group actor执行完成查询所需的任务。到目前为止,我们已经创建了属于经典域对象的actor,但现在,我们将创建一个代表进程或任务而不是实体的actor。我们通过保持我们的组设备行为者简单并且能够更好地单独测试查询功能而受益。

定义查询actor

首先,我们需要设计查询actor的生命周期。这包括确定其初始状态,将采取的第一个操作以及清理 - 如有必要。查询actor需要以下信息:

  • 要查询的活动设备actor的快照和ID。
  • 启动查询的请求的ID(以便我们可以将其包含在回复中)。
  • 发送查询的actor的引用。我们会直接将回复发送给这位Actor。
  • 截止日期,指示查询应等待回复的时间。使这个参数将简化测试。

调度查询超时

由于我们需要一种方法来表明我们愿意等待多长时间的响应,现在是时候引入一个我们尚未使用的新Akka功能,即内置的调度程序工具。使用调度程序很简单:

  • 我们从ActorSystem获取调度程序,而调度程序可以从actor的上下文访问:getContext().getSystem().scheduler()。这需要一个ExecutionContext,它是将执行计时器任务本身的线程池。在我们的例子中,我们通过传入getContext()dispatcher()来使用与actor相同的调度程序。
  • scheduler.scheduleOnce(time,actorRef,message,executor,sender)方法将在指定时间内将消息消息安排到将来,并将其发送给actor actorRef。

我们需要创建一个表示查询超时的消息。为此,我们创建了一个简单的消息CollectionTimeout,没有任何参数。scheduleOnce的返回值是一个可取消的,如果查询在时间上成功完成,可用于取消计时器。在查询开始时,我们需要向每个设备参与者询问当前温度。为了能够快速检测在获得ReadTemperature消息之前停止的设备,我们还将观察每个actor。这样,我们获得了在查询生命周期内停止的那些消息的终止消息,因此我们不需要等到超时才能将这些消息标记为不可用。

把它们放在一起,我们的DeviceGroupQuery actor的轮廓如下所示:

Akka查询设备组《fourteen》译

跟踪Actor状态

除了挂起的计时器之外,查询actor还有一个有状态方面,跟踪已经回复,已经停止或没有回复的actor集。跟踪此状态的一种方法是在actor中创建可变字段。一种不同的方法利用了改变演员如何响应消息的能力。Receive只是一个可以从另一个函数返回的函数(或者一个对象,如果你愿意)。默认情况下,接收块定义actor的行为,但可以在actor的生命周期内多次更改它。我们调用context.become(newBehavior),其中newBehavior是任何类型为Receive的东西。我们将利用此功能来跟踪演员的状态。

对于我们的用例:

我们不是直接定义receive,而是委托waitingForReplies函数来创建Receive。

waitingForReplies函数将跟踪两个不断变化的值:已收到回复的Map,我们还在等待的Actor Set。

我们有三个Event可以采用:

  • 我们可以从其中一个设备收到RespondTemperature消息。
  • 我们可以收到已在此期间停止的设备actor的终止消息。
  • 我们可以到达截止日期并收到CollectionTimeout。

在前两种情况下,我们需要跟踪回复,我们现在委托给一个方法receivedResponse,我们将在后面讨论。在超时的情况下,我们需要简单地接受尚未回复的所有actor(该组的成员仍然等待),并将DeviceTimedOut作为最终回复中的状态。然后我们用收集的结果回复查询的提交者并停止查询actor。

为此,请将以下内容添加到DeviceGroupQuery源文件中:

Akka查询设备组《fourteen》译

目前尚不清楚我们将如何“改变”repliesSoFar和仍然等待数据结构。需要注意的一件重要事情是waitForReplies函数不直接处理消息。它返回一个将处理消息的Receive函数。这意味着如果我们再次使用不同的参数调用waitingForReplies,那么它将返回一个全新的Receive,它将使用这些新参数。

我们已经看到了如何通过从接收返回来安装初始接收。例如,为了安装新的回复,我们需要一些机制。这个机制是方法context.become(newReceive),它将actor的消息处理函数改为提供的newReceive函数。你可以想象,在开始之前,你的actor会自动调用context.become(receive),即安装从receive返回的Receive函数。这是另一个重要的观察:它不是处理消息的接收,它返回一个实际处理消息的Receive函数。

我们现在必须弄清楚在receivedResponse中要做什么。首先,我们需要在地图repliesSoFar中记录新结果,并从stillWaiting中删除actor。下一步是检查我们还在等待剩下的演员。如果没有,我们将查询结果发送给原始请求者并停止查询actor。否则,我们需要更新repliesSoFar和stillWaiting结构并等待更多消息。

在之前的代码中,我们将Terminated视为隐式响应DeviceNotAvailable,因此receivedResponse不需要做任何特殊的事情。但是,我们仍然需要完成一项小任务。我们可能会从设备actor收到适当的响应,但随后会在查询的生命周期内停止响应。我们不希望第二个事件覆盖已经收到的回复。换句话说,我们不希望在记录响应后收到Terminated。通过调用context.unwatch(ref)可以很容易地实现。此方法还确保我们不会收到已存在于actor的邮箱中的已终止事件。多次调用也是安全的,只有第一次调用才会有效,其余的都会被忽略。

有了这些知识,我们可以创建receivedResponse方法:

Akka查询设备组《fourteen》译

在这一点上,我们很自然地通过使用context.become()技巧获得了什么,而不是使repliesSoFar和stillWaiting结构成为actor的可变字段(即vars)?在这个简单的例子中,并没有那么多。当你突然有更多种类的状态时,这种状态保持的价值变得更加明显。由于每个状态可能具有相关的临时数据,因此将这些数据保持为字段会污染参与者的全局状态,即不清楚在什么状态下使用哪些字段。使用参数化接收“工厂”方法,我们可以保持仅与状态相关的数据私有。使用可变字段而不是context.become()重写查询仍然是一个很好的练习。但是,建议您熟悉我们在此处使用的解决方案,因为它有助于以更清晰,更易于维护的方式构建更复杂的actor代码。

我们的查询Actor现在完成了:

Akka查询设备组《fourteen》译

Akka查询设备组《fourteen》译

Akka查询设备组《fourteen》译

测试查询actor

现在让我们验证查询actor实现的正确性。我们需要单独测试各种场景,以确保一切按预期工作。为了能够做到这一点,我们需要以某种方式模拟设备演员以执行各种正常或失败场景。值得庆幸的是,我们将协作者列表(实际上是一个Map)作为查询参与者的参数,因此我们可以传入TestKit引用。在我们的第一次测试中,当有两个设备并且都报告温度时,我们会尝试这种情况:

Akka查询设备组《fourteen》译

这是一个幸福的案例,但我们知道有时设备无法提供温度测量。此方案与上一个略有不同:

Akka查询设备组《fourteen》译

我们也知道,有时设备Actor会在回答之前停止:

Akka查询设备组《fourteen》译

如果你还记得,还有另一个与设备演员停止相关的案例。我们可能会从设备actor获得正常回复,但之后会收到同一个actor的Terminated。在这种情况下,我们希望保留第一个回复,而不是将设备标记为DeviceNotAvailable。我们也应该测试一下:

Akka查询设备组《fourteen》译

最后一种情况是并非所有设备都能及时响应。为了使我们的测试保持相对较快,我们将构造具有较小超时的DeviceGroupQuery actor

Akka查询设备组《fourteen》译

我们的查询现在按预期工作,现在是时候在DeviceGroup actor中包含这个新功能了。

向组中添加查询功能

在组actor中包含查询功能现在相当简单。我们在查询actor本身中完成了所有繁重工作,组actor只需要使用正确的初始参数创建它,而不需要其他任何东西。

Akka查询设备组《fourteen》译

Akka查询设备组《fourteen》译

Akka查询设备组《fourteen》译

重申我们在本章开头所说的内容可能是值得的。通过在单独的actor中保持仅与查询本身相关的临时状态,我们保持组actor实现非常简单。它将一切都委托给儿童演员,因此不必保持与其核心业务无关的状态。此外,多个查询现在可以彼此并行运行,事实上,根据需要可以多个查询。在我们的情况下,查询单个设备actor是一种快速操作,但如果不是这种情况,例如,因为需要通过网络联系远程传感器,这种设计将显着提高吞吐量。

我们通过测试一切可以一起工作来结束本章。此测试是以前测试的变体,现在正在执行组查询功能:

Akka查询设备组《fourteen》译

Full source at GitHub

在物联网系统的背景下,本指南介绍了以下概念。如有必要,您可以按照链接进行审核:

下节再续!

原文:https://doc.akka.io/docs/akka/2.5/guide/tutorial_5.html

有什么讨论的内容,可以加我公众号:

Akka查询设备组《fourteen》译