Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在本章中,我们将介绍QNX Neutrino最具特色的功能,即Message Passing。Message Passing是操作系统微内核架构的核心,为操作系统提供了模块化。

A small microkernel and message passing

QNX Neutrino的主要优势之一是它具有可扩展性。通过“可扩展性”,我的意思是它可以定制在具有严格内存限制的小型嵌入式盒子上工作,也可以延伸到具有几乎无限内存的多处理器SMP盒的大型网络。

QNX Neutrino通过使每个提供服务的组件模块化来实现其可扩展性。这样,您只需要在最终系统中包含所需的组件就可实现这些功能。通过在设计中使用线程,您还可以帮助它扩展到SMP系统(我们将在本章中看到线程的更多用途)。

这在QNX系统初始设计中就一直沿用的理念,并且一直延续至今。这样的设计关键是一个小型微内核架构,其传统上将模块作为可选组件整合到单片内核中。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

您,系统架构师,决定需要使用哪些模块。您是否需要项目中的文件系统?如果是,则添加一个。如果你不需要,那么不需要包含。你需要一个串口驱动吗?无论答案是肯定还是否定,这都不会影响(也不会影响)您之前关于文件系统的决定。

在运行时,您可以决定正在运行的系统中包含哪些系统组件。您可以在实时系统中动态删除组件,并在其他时间重新安装它们或其他系统。这些“drivers”有什么特别之处吗?不,他们只是常规的用户级程序,恰好用硬件执行特定的工作。实际上,我们将在资源管理器章节中看到如何编写它们。

实现这一目标的关键是Message Passing。在QNX Neutrino下,功能模块并不是直接绑定到OS内核中,而是与内核建立某种特殊的联系,这种联系即通过它们之间的消息传递进行通信。内核基本上只负责线程级服务(例如,调度)。事实上,消息传递并不仅仅用于此功能模块的安装和卸载技巧 - 它几乎是所有其他服务的基本构建块(例如,内存分配是通过向进程管理器发送的消息来执行的)。当然,也有一些服务是由直接内核调用提供的。

考虑打开一个文件并向其写入一个数据块。这是通过从应用程序发送到QNX Neutrino的可安装组件(称为文件系统)的许多消息来实现的。该消息告诉文件系统打开一个文件,然后另一条消息告诉它写一些数据(并包含该数据)。不过不用担心 - QNX Neutrino操作系统可以非常快速地执行消息传递。

Message passing and client/server

想象一下应用程序从文件系统读取数据。在QNX lingo中,应用程序是一个从服务器请求数据的客户端。

此客户端/服务器模型引入了与Message Passing相关的几个进程状态(我们在“进程和线程”一章中讨论了这些状态)。最初,服务器正在等待消息从某个地方到达。此时,服务器被称为接收阻止(也称为RECEIVE状态)。这是一些示例pidin输出:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在上面的示例中,pseudo-tty服务器(称为devc-pty)是进程ID 4,其有一个线程(线程ID 1),以优先级10 Round-Robin运行,并且是接收阻塞的,等待来自通道ID为1的消息。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

收到消息后,服务器进入READY状态,并且能够运行。如果它恰好是最高优先级的READY进程,它将获得CPU并可以执行某些处理。由于它是一个服务器,它会查看它刚刚收到的消息,并决定如何处理它。在某些时候,服务器将完成消息告诉它要做的任何工作,然后将“回复”给客户端。我们切换到客户端。最初客户端正在运行,消耗CPU,直到它决定发送消息。客户端从READY更改为send-blocked或reply-blocked,具体取决于它向其发送消息的服务器的状态。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

通常,你会更频繁的看到回复阻塞状态,与发送阻塞状态相比。这是因为回复阻止状态意味着:

服务器已收到消息,现在正在处理它。在某些时候,服务器将完成处理,并将其回复给客户端。客户端此时处于被阻塞状态,等待服务器的回复。

与发送阻塞状态对比:

服务器尚未收到消息,很可能是因为它正忙于处理另一条消息。当服务器转向“接收”客户端的消息时,您将从发送阻止状态进入回复阻塞状态。

实际上,如果您看到一个发送阻止的进程,则意味着以下两种情况之一:

  1. 在服务器忙于为客户端提供服务的情况下,您碰巧take a snapshot of the system,并为该服务器收到了新的请求。这是正常情况; 您可以通过再次运行pidin来验证它以获取新快照。这次你可能会看到该进程不再被阻止发送。

  2. 服务器遇到了错误,无论出于何种原因,都不再收听请求。发生这种情况时,您会看到许多进程在一台服务器上发送阻止。要验证这一点,请再次运行pidin,观察客户端进程的阻塞状态没有变化。

这是一个示例,显示了一个回复阻止客户端及其被阻止的服务器:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这表明程序esh(嵌入式shell)已经向进程号1(内核和进程管理器,procnto-smp-instr)发送了一条消息,现在正在等待回复。

现在您了解了客户端/服务器体系结构中消息传递的基础知识。

所以现在你可能会想,“我是否必须编写特殊的QNX Neutrino Message Passing API来打开文件或读写一些数据?!?”

你不必编写任何Message Passing函数,除非你想“陷入困境”(我将在稍后讨论)。实际上,让我向您展示一些传递消息的客户端代码:

#include <fcntl.h>
#include <unistd.h>

 

int main (void)
{
int fd;
fd = open ("filename", O_WRONLY);
write (fd, "This is message passing\n", 24);
close (fd);
return (EXIT_SUCCESS);
}

看到?标准C代码,没有什么棘手的。Message Passing由QNX Neutrino C库完成。您只需发出标准POSIX 1003.1或ANSI C函数调用,C库就会为您执行消息传递工作。

在上面的例子中,我们看到三个函数被调用,三个不同的消息被发送:

open() sent an “open” message
write() sent a “write” message
close() sent a “close” message

当我们查看资源管理器时(在资源管理器章节中),我们将更详细地讨论消息本身,但是现在您需要知道的是发送了不同类型的消息。

让我们退一步,将其与传统操作系统中的示例进行对比。

客户端代码将保持不变,并且供应商提供的C库将隐藏差异。在这样的系统上,open()函数调用将调用内核函数,该函数然后将直接调用文件系统,该文件系统将执行一些代码,并返回文件描述符。write()和close()调用会做同样的事情。

所以?以这种方式做事有什么好处吗? 继续阅读!

Network-distributed message passing

假设我们想要更改上面的示例,以便与网络上的其他节点进行通信。你可能认为我们必须调用特殊的函数调用来“联网”。这是网络版的代码:

#include <fcntl.h>
#include <unistd.h>

 

int main (void)
{
int fd;
fd = open ("/net/wintermute/home/rk/filename", O_WRONLY);
write (fd, "This is message passing\n", 24);
close (fd);
return (EXIT_SUCCESS);
}

如果您认为两个版本的代码几乎相同,那么您是对的。它确实如此。

在传统的操作系统中,C库open()调用进入内核,查看文件名,然后内核调用网络文件系统(NFS)代码,该代码确定/net/wintermute/home/rk/filename实际上在哪里。然后,NFS调用网络驱动程序并在节点wintermute上向内核发送消息,然后重复我们在原始示例中描述的过程。请注意,在这种情况下,实际上涉及两个文件系统; 一个是NFS客户端文件系统,一个是远程文件系统。不幸的是,取决于远程文件系统和NFS的实现,由于不兼容性,某些操作可能无法按预期工作(例如,文件锁定)。

在QNX Neutrino下,C库open()创建了它将发送到本地文件系统的相同消息,并将其发送到节点wintermute上的文件系统。在本地和远程情况下,使用完全相同的文件系统。

这是QNX Neutrino的另一个基本特征:网络分布式操作本质上是*的(或者说是通用的),因为通过Message Passing完成客户端与服务器的解耦工作。

在传统内核上,存在“双重标准”,其中本地服务以单向实现,而远程(网络)服务以完全不同的方式实现。

What it means for you

Message Passing is elegant and network-distributed。所以呢?它给你带来了什么?

嗯,这意味着你的程序继承了这些特性 - 它们也可以成为network-distributed,其工作远远少于其他系统。但我觉得最有用的好处是它们让你以一种漂亮优雅的模块化方式测试软件。

您可能曾参与大型项目,许多人必须提供不同的软件。当然,其中一些人迟早比其他人做得好。

这些项目经常在两个阶段出现问题:最初是在项目定义时,很难确定一个人的开发工作在哪里结束而另一个人开始,然后是在测试/集成时,何时无法进行完整的系统集成测试,因为所有的作品都不可用。

通过Message Passing方式,项目的各个组件可以非常容易地分离开来,从而实现非常简单的设计和简单的测试。如果您想根据现有范例考虑这一点,它与面向对象编程(OOP)中使用的概念非常相似。

归结起来的是,测试可以逐件进行。您可以设置一个简单的程序,将消息发送到您的服务器进程,并且由于该服务器进程的输入和输出是(或应该!)有详细记录,您可以确定该进程是否正常运行。哎呀,这些测试用例甚至可以自动化并放置在定期运行的回归套件中!

 

The philosophy of QNX Neutrino

Message Passing是QNX Neutrino哲学的核心。了解消息传递的用途和含义将是有效利用操作系统的关键。在我们进入细节之前,让我们先看一下理论。

Multiple threads

尽管客户端/服务器模型易于理解且最常用,但主题还有两个其他变体。第一个是使用多个线程(本节的主题),第二个是称为服务器/子服务器的模型,它有时对一般设计有用,但在网络分布式设计中确实很有用。两者的结合可以非常强大,特别是在SMP盒网络上!

正如我们在“进程和线程”一章中所讨论的那样,QNX Neutrino能够在同一进程中运行多个线程。当我们将它与Message Passing结合起来时,我们如何才能利用它?

答案很简单。我们可以启动一个线程池(使用我们在Processes and Threads章节中讨论过的thread_pool _ *()函数),每个函数都可以处理来自客户端的消息:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这样,当客户端向我们发送消息时,只要工作完成,我们就不关心哪个线程获取它。这具有许多优点。与仅使用一个线程为多个客户端服务相比,为多个客户端提供多线程服务的能力是一个强大的概念。主要优点是内核可以在各种客户端之间多任务处理服务器,而不需要服务器本身执行多任务处理。

在单处理器机器上,运行一堆线程意味着它们都在相互竞争CPU时间。

但是,在SMP盒子上,我们可以让多个线程竞争多个CPU,同时在这些多个CPU之间共享相同的数据区域。这意味着我们仅受该特定计算机上可用CPU数量的限制。

Server/subserver

现在让我们看一下服务器/子服务器模型,然后我们将它与多线程模型结合起来。

在此模型中,服务器仍然向客户端提供服务,但由于这些请求可能需要很长时间才能完成,因此我们需要能够启动请求并仍能够处理来自其他客户端的新请求

如果我们尝试使用传统的单线程客户端/服务器模型执行此操作,一旦收到并启动了一个请求,除非我们定期停止我们正在做的事情,否则我们将无法再接收任何其他请求,快速查看是否还有其他待处理请求,将这些请求放在工作队列中,然后继续,将注意力分散到工作队列中的各种作业上。效率不高。你实际上通过多个作业之间的“时间切片”来复制内核的工作!

想象一下如果你这样做会是什么样子。你在办公桌前,有人带着一个装满工作的文件夹向你走来。你开始研究它了。当你忙着工作的时候,你会注意到有人站在你的隔间的门口,同时有更多优先工作需要处理(当然)!现在你的办公桌上有两堆工作。你在一堆上花了几分钟,切换到另一堆,等等,一直看着你的门口,看看是否还有其他人带着更多的工作。

服务器/子服务器模型在这里会更有意义。在这个模型中,我们有一个服务器,可以创建其他几个进程(子服务器)。这些子服务器每个都向服务器发送一条消息,但服务器在收到客户端请求之前不会回复它们。然后,它通过回复它应该执行的作业将客户端的请求传递给其中一个子服务器。下图说明了这一点。注意箭头的方向-它们表示发送的方向!

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

如果你正在做这样的工作,你首先要雇用一些额外的员工。这些员工都会来找你(正如子服务器向服务器发送消息 - 因此关于上图中箭头的注释),寻找工作要做。最初,您可能没有,所以您不会回复他们的查询。当有人带着一个装满工作的文件夹进入你的办公室时,你会告诉你的一位员工(子服务器),“这里有一些工作要做。”然后那个员工(子服务器),就去做了工作。随着其他工作的进入,您将其委托给其他员工(子服务器),。

这个模型的诀窍在于它是reply-driven - 当您回复子服务器时,工作就开始了。标准客户端/服务器模型是send-driven的,因为在向服务器发送消息时工作开始

那么为什么客户会进入您的办公室,而不是您雇佣的员工办公室?你为什么“仲裁”这项工作?答案很简单:你是负责执行特定任务的协调员。由您来确保工作完成。与您一起工作的客户知道您,但他们不知道您(可能是临时)员工的姓名或位置。

您可能怀疑,您当然可以将多线程服务器与服务器/子服务器模型混合使用。主要技巧是确定“问题”的哪些部分最适合通过网络分布(通常那些不会过多地消耗网络带宽的部分)以及哪些部分最适合分布在 SMP架构(通常是那些想要使用公共数据区域的部分)。

那么我们为什么要使用一个呢?使用服务器/子服务器方法,我们可以在网络上的多台计算机上分配工作。这实际上意味着我们仅受网络上可用机器数量的限制(当然还有网络带宽)。将其与通过网络分布的一堆SMP盒上的多个线程相结合,产生“计算集群”,其中*“仲裁器”委托(通过服务器/子服务器模型)工作到网络上的SMP盒。

Some examples

现在我们将考虑每种方法的几个例子。

Send-driven (client/server)

文件系统,串行端口,控制台和声卡都使用客户端/服务器模型。C语言应用程序承担客户端的角色并向这些服务器发送请求。服务器执行指定的任何工作,并回复答案。

其中在一些传统的“客户端/服务器”的服务器中可能实际是reply-driven(服务器/子服务器)服务器!这是因为,对于最终客户端而言,其服务器是为标准服务器,即使服务器本身使用服务器/子服务器方法来完成工作。我的意思是,客户端仍然向它认为是“服务提供过程”发送消息。实际发生的是“服务提供过程”简单地将客户端的工作委托给不同的进程(子服务器)。

Reply-driven (server/subserver)

一种比较流行的reply-driven程序是分布在网络上的分形图形程序。主程序将屏幕划分为若干区域,例如64个区域。在启动时,主程序将获得可参与此活动的节点列表。主程序启动子程序(子服务器),每个节点上有一个子程序,然后等待子程序发送给主程序。

然后,主程序重复选择“未填充”区域(屏幕上的64个),并通过回复将分形计算工作委托给另一个节点上的子程序。当子程序完成计算后,它会将结果发送回主程序服务器,主程序服务器会在屏幕上显示结果。

因为子程序发送给主程序,现在由主程序再次回复更多的工作。主程序继续这样做,直到屏幕上的所有64个区域都已填满。

An important subtlety

因为主程序将工作委托给子程序,所以主程序不能在任何一个程序上被阻止打断! 在传统的send-driven方法模型中,您希望主服务器创建一个子程序然后发送给它。不幸的是,主程序不会应答,直到工作计划已完成,这意味着主程序不能同时发送给另一个字程序,在一定程度上有,否定了具有多个工作节点的优势。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

此问题的解决方案是让子工作程序启动,并通过向主程序发送消息询问是否有任何工作要做。我们再一次使用图中箭头的方向来指示发送的方向。现在工人程序正在等待master回复。当某些东西告诉master做一些工作时,它会回复一个或多个工作人员,这会导致他们离开并完成工作。这让工人可以开展业务; 主程序仍然可以响应新的请求(它不会被阻止等待其中一个工作人员的回复)。

Multithreaded server

从客户端的角度来看,多线程服务器与单线程服务器无法区分。事实上,服务器的设计者可以通过启动另一个线程来“打开”多线程。无论如何,服务器仍然可以在SMP配置中使用多个CPU,即使它只为一个“客户端”提供服务。这意味着什么?让我们重新审视分形图形示例。当一个子服务器从服务器获得“计算”的请求时,绝对没有什么能阻止子服务器在多个CPU上启动多个线程来为一个请求提供服务。事实上,为了使应用程序在具有一些SMP盒和一些单CPU盒的网络中更好地扩展,服务器和子服务器最初可以交换消息,从而子服务器告诉服务器它有多少CPU - 这让它知道多少要求它可以同时服务。然后,服务器将排队更多的SMP盒请求,允许SMP盒比单CPU盒做更多的工作。

Using message passing

现在我们已经看到了消息传递中涉及的基本概念,并了解到即使像C库这样的常见日常事物也使用它,让我们来看看其中的一些细节。

Architecture & structure

我们一直在谈论“客户”和“服务器”。我还使用了三个关键短语:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

我特意使用这些短语,因为它们密切反映了QNX Neutrino Message Passing操作中使用的实际函数名称。

以下是QNX Neutrino下可用消息传递的完整功能列表(按字母顺序排列):

ChannelCreate(), ChannelDestroy()
ConnectAttach(), ConnectDetach()
MsgDeliverEvent()
MsgError()
MsgRead(), MsgReadv()
MsgReceive(), MsgReceivePulse(), MsgReceivev()
MsgReply(), MsgReplyv()
MsgSend(), MsgSendnc(), MsgSendsv(), MsgSendsvnc(), MsgSendv(),

MsgSendvnc(), MsgSendvs(),
MsgSendvsnc()
MsgWrite(), MsgWritev()

不要让这个名单压倒你!您可以使用列表中的一小部分调用来编写非常有用的客户端/服务器应用程序-当您习惯这些想法时,您会发现某些其他函数在某些情况下非常有用。

The client

客户端想要向服务器发送请求,处于阻止状态直到服务器完成请求,然后当请求完成并且客户端被解除阻塞时,才能获得“回答”。

这意味着两件事:客户端需要能够建立与服务器的连接,然后通过消息传输数据 - 从客户端到服务器的消息(“发送”消息)和从服务器返回到服务器的消息 客户端(“回复”消息,服务器的回复)。

Establishing a connection

那么,让我们依次看看这些功能。我们需要做的第一件事是建立连接。我们使用ConnectAttach()函数执行此操作,如下所示:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

ConnectAttach()有三个标识符:nd,即节点描述符,pid,即进程ID,以及chid,即通道ID。这三个ID(通常称为“ND / PID / CHID”)唯一标识客户端要连接的服务器。我们将忽略索引和标志(只需将它们设置为0)。

因此,我们假设我们要连接到节点上的进程ID 77,通道ID 1。这是执行此操作的代码示例:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

如您所见,通过指定nd为零,我们告诉内核我们希望在节点上建立连接。

Note:我怎么知道我想和进程ID 77和通道ID 1对话?我们很快就会看到(参见下面的“Finding the server's ND/PID/CHID”)。

此时,我有一个连接ID,一个小整数,用于唯一标识从客户端到特定服务器上特定通道的连接。

我可以根据需要多次发送到服务器时使用此连接ID。 当我完成它之后,我可以通过以下方式销毁它:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

Sending messages

使用MsgSend *()函数系列的某些变体实现在客户端上传递的消息。我们将看看最简单的成员MsgSend():

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

Let's send a simple message to process ID 77, channel ID 1:

#include <sys/neutrino.h>
char *smsg = "This is the outgoing buffer";
char rmsg [200];
int coid;


// establish a connection
coid = ConnectAttach (0, 77, 1, 0, 0);
if (coid == -1) {
fprintf (stderr, "Couldn't ConnectAttach to 0/77/1!\n");
perror (NULL);
exit (EXIT_FAILURE);
}


// send the message
if (MsgSend (coid, smsg, strlen (smsg) + 1, rmsg, sizeof (rmsg)) == -1)

{
fprintf (stderr, "Error during MsgSend\n");
perror (NULL);
exit (EXIT_FAILURE);
}

if (strlen (rmsg) > 0) {
printf ("Process ID 77 returns \"%s\"\n", rmsg);
}

假设进程ID 77是一个活动服务器,期望它的通道ID 1上的特定格式的消息。在服务器收到消息之后,它将处理它并在某些时候回复结果。此时,MsgSend()将返回0表示一切顺利。如果服务器在回复中向我们发送任何数据,我们将使用最后一行代码打印它(我们假设我们正在获取NUL终止的ASCII数据)。

The server

现在我们已经看到了客户端,让我们来看看服务器。客户端使用ConnectAttach()创建与服务器的连接,然后使用MsgSend()进行所有消息传递。

Creating the channel

这意味着服务器必须创建一个通道 - 这是客户端在发出ConnectAttach()函数调用时连接的东西。创建通道后,服务器通常会永久保留它。通过ChannelCreate()函数创建通道,并通过ChannelDestroy()函数销毁:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

我们稍后会回到flags参数(在下面的“Channel flags”部分)。 现在,我们只使用0.因此,要创建一个通道,服务器会发出:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

所以我们有一个通道。此时,客户端可以(通过ConnectAttach())连接到此通道并开始发送消息:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

Message handling

就消息传递方面而言,服务器处理两个阶段的消息传递;“接收”阶段和“回复”阶段:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

我们将首先看看这些函数的两个简单版本,MsgReceive()和MsgReply(),然后再看一些变体。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

从图中可以看出,我们需要讨论四件事:

  1. 客户端发出MsgSend()并指定其发送缓冲区(smsg指针和sbytes长度)。这将被转移到服务器的MsgReceive()函数提供的缓冲区中,rmsg的长度为rbytes。客户端现已被阻止。

  2. 服务器的MsgReceive()函数解除阻塞,并返回rcvid,服务器稍后将使用该rcvid进行回复。此时,数据可供服务器使用。

  3. 服务器已完成消息的处理,现在使用从MsgReceive()获取的rcvid将其传递给MsgReply()。请注意,MsgReply()函数将具有定义大小(sbytes)的缓冲区(smsg)作为要传输到客户端的数据的位置。数据现在由内核传输。

  4. 最后,sts参数由内核传输,并显示为客户端MsgSend()的返回值。客户端现在解锁。

您可能已经注意到每个缓冲区传输有两种大小(在客户端发送的情况下,客户端有sbytes,服务器端有rbytes;服务器回复情况下,服务器端有sbytes,客户端有rbytes。)存在两组大小,以便每个组件的程序员可以指定其缓冲区的大小。这样做是为了增加安全性。

在我们的示例中,MsgSend()缓冲区的大小与消息字符串的长度相同。让我们看看服务器,看看那里的大小是如何使用的。

Server framework
Here's the overall structure of a server:

#include <sys/neutrino.h>
...
void server (void)
{
int rcvid; // indicates who we should reply to
int chid; // the channel ID
char message [512]; // big enough for our purposes

 

// create a channel
chid = ChannelCreate (0);
// this is typical of a server: it runs forever
while (1) {
// get the message, and print it
rcvid = MsgReceive (chid, message, sizeof (message), NULL);
printf ("Got a message, rcvid is %X\n", rcvid);

printf ("Message was \"%s\".\n", message);

// now, prepare the reply. We reuse "message"

strcpy (message, "This is the reply");

MsgReply (rcvid, EOK, message, sizeof (message));
}
}

如您所见,MsgReceive()告诉内核它可以处理大小为sizeof (message)或512字节)的消息。我们的示例客户端(上面)只发送了28个字节(字符串的长度)。下图说明了:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

内核传输两种大小指定的最小值。在我们的例子中,内核将传输28个字节。服务器将被解锁并显示客户端的消息。剩余的484个字节(512字节缓冲区)将保持不受影响。

我们再次使用MsgReply()遇到相同的情况。MsgReply()函数表示要传输512个字节,但我们客户端的MsgSend()函数已指定最多可传输200个字节。所以内核再次传递最小值。在这种情况下,客户端可以接受的200个字节限制了传输大小。(这里有一个有趣的方面是,一旦服务器传输数据,如果客户端没有收到所有数据,就像在我们的例子中那样,没有办法让数据恢复 - 它就永远消失了。)

请记住,这种“修剪”操作是正常的和预期的行为。

当我们讨论通过网络传递的消息时,您会发现传输的数据量存在微小的“问题”。 我们将在下面的“Networked message-passing differences”中看到这一点。

The send-hierarchy

在消息传递环境中可能不明显的一件事是需要遵循严格的发送层次结构。这意味着两个线程永远不应该相互发送消息; 相反,它们应该被组织起来,使每个线程占据一个层次; 所有发送从一个级别到更高级别,从不到相同或更低级别。让两个线程相互发送消息的问题是,最终你会遇到死锁问题; 两个线程都在等待彼此回复它们各自的消息。由于线程被阻塞,它们永远不会有机会运行并执行回复,因此最终会有两个(或更多!)挂起的线程。

为线程分配级别的方法是将最外层的客户端放在*别,然后从那里开始工作。例如,如果你有一个依赖于某个数据库服务器的图形用户界面,而数据库服务器又依赖于文件系统,而文件系统又依赖于块文件系统驱动程序,那么你就拥有了不同的自然层次结构流程。发送将从最外面的客户端(图形用户界面)向下流到下层服务器; 回复将以相反的方向流动。

虽然这在大多数情况下肯定有效,但您会遇到需要“中断”发送层次结构的情况。这绝不是通过简单地违反发送层次结构并发送“反对流”的消息来完成的,而是通过使用MsgDeliverEvent()函数完成的,稍后我们将对此进行介绍。

Receive IDs, channels, and other parameters

我们还没有谈到上面例子中的各种参数,所以我们可以只关注消息传递。现在让我们来看看。

More about channels

在上面的服务器示例中,我们看到服务器只创建了一个通道。它当然可以创建更多,但通常,服务器不会这样做。(具有两个通道的服务器最明显的例子是透明分布式处理(TDP,也称为Qnet)本机网络管理器 - 绝对是一个奇怪的软件!)

事实证明,实际上并不需要在现实世界中创建多个渠道。通道的主要目的是为服务器提供一个明确定义的位置来“监听”消息,并为客户端提供一个明确定义的位置来发送消息(通过连接方式)。关于您在服务器中拥有多个频道的唯一情况是服务器是否要提供不同的服务或不同的服务类别,具体取决于消息到达的频道。例如,第二个信道可用作丢弃唤醒脉冲的地方 - 这确保它们被视为与到达第一信道的消息不同的“服务等级”。

在上一段中,我曾说过你可以在服务器中运行一个线程池,准备接受来自客户端的消息,并且哪个线程获得请求并不重要。这是频道抽象的另一个方面。在以前版本的QNX系列操作系统(特别是QNX 4)中,客户端将在由节点ID和进程ID标识的服务器上定位消息。由于QNX 4是单线程的,这意味着不会混淆有关“向谁发送”消息。但是,一旦你引入了线程,就必须决定如何处理线程(实际上,“服务提供者”)。由于线程是短暂的,因此让客户端连接到特定的节点ID,进程ID和线程ID实际上没有意义。另外,如果那个特定的线程很忙呢?我们必须提供一些方法来允许客户端在定义的服务提供线程池中选择“非忙线程”。

嗯,这正是一个通道。它是“服务线程池”的“address”。这里的含义是,一堆线程可以在特定通道上发出MsgReceive()函数调用,并且阻塞,一次只有一个线程获取消息。

Who sent the message?

通常,服务器需要知道是谁向其发送了消息。有许多的原因:

• accounting
• access control
• context association

• class of service
• compatibility
• etc.

让客户端向发送的每条消息提供此信息将是繁琐的(并且存在安全漏洞)。因此,每当MsgReceive()函数解锁时,内核都会填充一个结构,因为它有一条消息。此结构的类型为struct _msg_info,包含以下内容:

struct _msg_info
{
uint32_t
nd;
uint32_t
srcnd;
pid_t
pid;
int32_t
tid;
int32_t
chid;
int32_t
scoid;
int32_t
coid;
int16_t
priority;
int16_t
flags;
size64_t
msglen;
size64_t
srcmsglen;
size64_t
dstmsglen;
};

您将它作为最后一个参数传递给MsgReceive()函数。如果传递NULL,则没有任何反应。(稍后可以通过MsgInfo()调用检索信息,因此它不会永远消失!)

我们来看看这些领域:

nd, srcnd, pid, and tid

节点描述符,进程ID和客户端的线程ID。(注意,nd是发送节点的接收节点的节点描述符;srcnd是接收节点的发送节点的节点描述符。这有一个很好的理由,我们将在下面的 Some notes on NDs中看到。)

Priority

发送线程的优先级。
chidcoid
该消息被发送到信道ID,和所使用的连接ID。

Scoid

服务器连接ID。这是内核用于将消息从服务器路由回客户端的内部 标识符。你不需要知道它,除了有趣的事实,它将是一个唯一代表客户端的小整数。

Flags

包含各种标志位,包括以下内容:
• _NTO_MI_BITS_64和_NTO_MI_BITS_DIFF告诉您发件人使用的是64位体系结构,或者使用的字体大小不同于您使用的字体。
• _NTO_MI_ENDIAN_BIG和_NTO_MI_ENDIAN_DIFF告诉您发送计算机的字节顺序(如果消息来自具有不同字节序的计算机的网络)。
•内部使用_NTO_MI_NET_CRED_DIRTY。
如果您确定您的程序与发件人不兼容,则可以返回错误,例如ENOTSUP。 请参阅本章后面的“回复没有数据或错误”。

Msglen

收到的字节数

Srcmsglen

客户端发送的源消息的长度(以字节为单位)。 这可能大于msglen中的值,就像接收的数据少于发送的数据一样。请注意,仅当在ChannelCreate()的flags参数中为接收到消息的通道设置了_NTO_CHF_SENDER_LEN时,此成员才有效。

Dstmsglen

客户端回复缓冲区的长度,以字节为单位。只有在ChannelCreate()的参数中为接收到消息的通道设置_NTO_CHF_REPLY_LEN标志时,此字段才有效。

The receive ID (a.k.a. the client cookie)

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这是一个关键的代码片段,因为它说明了从客户端接收消息之间的绑定,然后能够(稍后)回复该特定客户端。receive ID是一个整数,充当“magic cookie”,如果您想稍后与客户端进行交互,则需要保留它。如果丢了会怎么样?它消失了。客户端不会从MsgSend()中解除阻塞,直到您(服务器)死亡,或者客户端在消息传递调用上有超时(即便如此,它也很棘手; 请参阅QNX Neutrino C库参考中的TimerTimeout()函数,以及关于它在“内核超时”下的“Clocks, Timers, and Getting A Kick Every So Often”章节中使用的讨论)。

注意:不要依赖于接收ID的值来具有任何特定含义 - 它可能会在未来版本的操作系统中发生变化。您可以假设它将是唯一的,因为您将永远不会有两个由相同接收ID标识的未完成客户端(在这种情况下,当您执行MsgReply()时内核无法区分它们)。

另请注意,除了一个特殊情况(稍后我们将看到的MsgDeliverEvent()函数)之外,一旦完成了MsgReply(),该特定的接收ID就不再具有意义

Replying to the client

MsgReply()接受接收ID,状态,消息指针和消息大小。我们刚刚讨论了接收ID; 它标识应该将回复消息发送给谁。status变量指示应该传递给客户端的MsgSend()函数的返回状态。最后,消息指针和大小指示应该发送的可选回复消息的位置和大小。

MsgReply()函数可能看起来非常简单(并且它是),但其应用程序需要进行一些检查。

Not replying to the client

在通过MsgReceive()接受来自其他客户端的新消息之前,绝对没有强制要求您回复客户端!这可以在许多不同的场景中使用。

在典型的设备驱动程序中,客户端可能会提出长时间不会被服务的请求。例如,客户端可能会要求模数转换器(ADC)设备驱动程序“出去并收集45秒的样本。”与此同时,ADC驱动程序不应该只关闭45秒,而不去操作别的事情!其他客户端可能希望提供服务请求(例如,可能存在多个模拟通道,或者可能存在应立即可用的状态信息等)。

在架构上,ADC驱动程序将简单地对从MsgReceive()获取的接收ID进行排队,启动45秒的累积过程,然后关闭并处理其他请求。当45秒结束并且样本已累积时,ADC驱动程序可以找到与请求关联的接收ID,然后回复客户端。

在reply-driven的服务器/子服务器模型(其中一些“客户端”是子服务器)的情况下,您还希望阻止对客户端的回复。由于子服务器正在寻找工作,您只需记下他们的接收ID并将其存储起来。当实际工作到达时,那时你才会回复子服务器,从而表明它应该做一些工作。

Replying with no data, or an errno

当您最终回复客户端时,不要求您传输任何数据。这用于两种情况。

如果回复的唯一目的是取消阻止客户端,您可以选择不回复数据。假设客户端只是希望在某个特定事件发生之前被阻止,但它不需要知道哪个事件。在这种情况下,MsgReply()函数不需要任何数据; 接收ID足够:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这将取消阻止客户端(但不返回任何数据)并返回EOK“成功”指示。

稍作修改,您可能希望将错误状态返回给客户端。在这种情况下,你不能使用MsgReply(),但必须使用MsgError():

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在上面的示例中,服务器检测到客户端正在尝试写入只读文件系统,而不是返回任何实际数据,只是将EROFS的错误返回给客户端。

或者(我们将很快查看调用),您可能已经传输了数据(通过MsgWrite()),并且没有额外的数据要传输。

为什么这两个calls方式?它们略有不同。虽然MsgError()和MsgReply()都将取消阻止客户端,但MsgError()不会传输任何其他数据,将导致客户端的MsgSend()函数返回-1,并将导致客户端将errno设置为传递的任何内容作为MsgError()的第二个参数。

另一方面,MsgReply()可以传输数据(由第三个和第四个参数指示),并将导致客户端的MsgSend()函数返回作为第二个参数传递给MsgReply()的任何内容。MsgReply()对客户端的errno没有影响。

通常,如果您只返回通过/失败指示(并且没有数据),则使用MsgError(),而如果要返回数据,则使用MsgReply()。传统上,当您返回数据时,MsgReply()的第二个参数将是一个正整数,表示返回的字节数。

Finding the server's ND/PID/CHID

您已经注意到在ConnectAttach()函数中,我们需要节点描述符(ND),进程ID(PID)和通道ID(CHID)才能连接到服务器。到目前为止,我们还没有谈到客户端如何找到这个ND / PID / CHID信息。

如果一个进程创建另一个进程,则很容易 - 进程创建调用返回新创建进程的进程ID。创建进程可以将命令行上自己的PID和CHID传递给新创建的进程,或者新创建的进程可以发出getppid()函数调用以获取其父进程的PID并假设“众所周知的”CHID。

如果我们有两个完全陌生的人怎么办?例如,如果第三方创建了服务器并且您编写的应用程序想要与该服务器通信,则会出现这种情况。真正的问题是,“服务器如何宣传其位置?”

有很多方法可以做到这一点; 我们将看看其中的四个,按照编程“优雅”的顺序递增:

  1. 打开一个众所周知的文件名并将ND / PID / CHID存储在那里。这是UNIX风格的服务器采用的传统方法,它们打开一个文件(例如/etc/httpd.pid),将其进程ID作为ASCII字符串写入,并期望客户端打开文件并获取进程ID。

  2. 使用全局变量来通告ND/PID/CHID信息。这通常用于需要向自己发送消息的多线程服务器,并且就其性质而言是非常有限的情况。

  3. 使用名称 - 位置函数(name_attach()和name_detach(),然后使用客户端的name_open()和name_close()函数。

  4. 接管路径名空间的一部分并成为资源管理器。当我们在资源管理器章节中查看资源管理器时,我们将讨论这个问题。

第一种方法非常简单,但可能会遇到“路径名污染”,其中/ etc目录中包含各种*.pid文件。由于文件是持久的(这意味着它们在创建过程终止并且机器重新启动后仍然存在),因此没有明显的方法来清理这些文件,除了可能有一个“严峻的收割机”任务,看看这些东西是否仍然有效。

还有另一个相关的问题。由于创建文件的进程可能会在不删除文件的情况下死亡,因此在尝试向其发送消息之前无法知道进程是否仍处于活动状态。更糟糕的是,文件中指定的ND/PID/CHID可能非常陈旧,以至于它会被其他程序重用!您发送给该程序的消息最多会被拒绝,最坏的情况可能会造成损害。所以这种方法已经出来了。

第二种方法,我们使用全局变量来通告ND / PID / CHID值,这不是一般解决方案,因为它依赖于客户端能够访问全局变量。由于这需要共享内存,因此它无法在网络中运行!这通常用于微小的测试用例程序或非常特殊的情况,但总是在多线程程序的上下文中使用。实际上,所有发生的事情是程序中的一个线程是客户端,另一个线程是服务器。 服务器线程创建通道,然后将通道ID放入一个全局变量(进程中所有线程的节点ID和进程ID相同,因此不需要通告它们。)然后客户端线程接收 全局通道ID并对其执行ConnectAttach()。

第三种方法,我们使用name_attach()name_detach()函数,其适用于简单的客户/服务器情况。

在最后一种方法中,服务器成为资源管理器绝对是最干净的,也是推荐的通用解决方案。“How”的机制将在资源管理器章节中变得清晰,但是现在,您需要知道的是服务器将特定路径名注册为其“权限域”,并且客户端执行简单的open()那个路径名。

我不能强调这一点:POSIX文件描述符使用连接ID实现;也就是说,文件描述符是一个连接ID!这种方案的优点在于,由于从open()返回的文件描述符是连接ID,因此客户端无需进一步工作即可使用该特定连接。例如,当客户端稍后调用read()并将文件描述符传递给它时,这会将很少的开销转换为MsgSend()函数。

What about priorities?

如果低优先级进程和高优先级进程同时向服务器发送消息,该怎么办?

消息始终按优先级顺序传递。

如果两个进程“同时”发送消息,则优先级较高的进程的整个消息首先被传递给服务器。
如果两个进程具有相同的优先级,那么消息将按时间顺序传递(因为在单处理器机器上没有绝对同时发生的事情 - 即使在SMP盒上也会有一些顺序,因为CPU会仲裁内核访问,在他们中间)。

当我们在本章后面讨论优先级倒置时,我们将回到这个问题引入的其他一些细微之处。

Reading and writing data

到目前为止,您已经看到了基本的消息传递原语。正如我之前提到的,这些都是你需要的。但是,有一些额外的功能可以让生活更轻松。

让我们考虑使用客户端和服务器的示例,我们可能需要其他功能。客户端发出MsgSend()以将一些数据传输到服务器。在客户端发出MsgSend()后它会阻塞;它正在等待服务器回复。

一个有趣的事情发生在服务器端。服务器已调用MsgReceive()以从客户端接收消息。根据您为消息选择的设计,服务器可能会或可能不知道客户端消息的大小。为什么服务器不知道消息有多大?考虑我们一直在使用的文件系统示例。假设客户端:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

如果服务器执行MsgReceive()并指定缓冲区大小(例如1024字节),则按预期工作。由于我们的客户端只发送了一条小信息(28字节),因此我们没有任何问题。

但是,如果客户端发送大于1024字节的内容,例如1兆字节,该怎么办?

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

服务器如何优雅地处理这个问题?我们可以任意地说,不允许客户端写入超过n个字节。然后,在write()的客户端C库代码中,我们可以查看此要求并将写入请求分成几个n字节的请求。这就很尴尬了。

这个例子的另一个问题是,“应该有多大?”

您可以看到这种方法有很多缺点:

  • 所有使用有限大小的消息传输的函数都必须在C库中进行修改,以便该函数对请求进行打包。这本身就是相当多的工作。此外,它可能会对多线程函数产生意外的副作用-如果来自一个线程的消息的第一部分被发送,然后客户端中的另一个线程抢占当前线程并发送其自己的消息。原来的线程离开了哪里?

  • 现在必须准备好所有服务器以处理可能到达的最大可能消息大小。这意味着所有服务器都必须具有较大的数据区域,否则库将不得不将大量请求分解为许多较小的请求,从而影响速度。

幸运的是,这个问题有一个相当简单的解决方法,也给我们带来了一些好处。

两个函数MsgRead()和MsgWrite()在这里特别有用。要记住的重要事实是客户端是BLOCK状态。这意味着客户端不会在服务器尝试检查数据结构时更改数据结构。

在多线程客户端中,另一个线程可能会混淆在服务器上阻塞的客户端线程的数据区域。这被认为是一个错误(糟糕的设计) - 服务器线程假定它具有对客户端数据区的独占访问权,直到服务器线程解除对客户端的阻塞。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

MsgRead()允许您的服务器从被阻止的客户端的地址空间读取数据,从客户端指定的“发送”缓冲区的开头开始偏移字节,进入msg为nbytes指定的缓冲区。服务器不会阻止,客户端也不会解除阻止。MsgRead()返回实际读取的字节数,如果有错误则返回 -1。

那么让我们考虑一下我们如何在write()示例中使用它。C Library write()函数构造一个消息,该消息带有一个头,它发送到文件系统服务器fs-qnx6。服务器通过MsgReceive()接收消息的一小部分,查看它,并决定将消息的其余部分放在何处。 fs-qnx6 服务器可能会决定将数据放入已分配的缓存缓冲区的最佳位置。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

因此,客户端决定向文件系统发送4 KB。(注意:C库如何在数据前面插入一个小标题,以便文件系统可以告诉它实际上是什么类型的请求 - 当我们查看多部分消息时,我们会回到这一点,更详细的 我们看看资源管理器。)

文件系统只读取足够的数据(标题)来确定它是什么类型的消息:

// part of the headers, fictionalized for example purposes
struct _io_write {
uint16_t type;
uint16_t combine_len;
uint32_t nbytes;
uint32_t xtype;
};
typedef union {
uint16_t type;

struct _io_read io_read;
struct _io_write io_write;
...
} header_t;


header_t header; // declare the header
rcvid = MsgReceive (chid, &header, sizeof (header), NULL);
switch (header.type) {
...
case _IO_WRITE:
number_of_bytes = header.io_write.nbytes;
...

此时,fs-qnx6知道4 KB位于客户端的地址空间中(因为消息在结构的nbytes成员中告诉它)并且应该将其传输到缓存缓冲区。 fs-qnx6 服务器可能会发出:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

请注意,消息传输已指定sizeof(header.io_write)的偏移量,以便跳过客户端C库添加的写入标头。我们假设cache_buffer [index] .size实际上是4096(或更多)字节。

同样,为了将数据写入客户端的地址空间,我们有:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

MsgWrite()允许您的服务器将数据写入客户端的地址空间,从客户端指定的“接收”缓冲区的开头开始偏移字节。在服务器空间有限但客户希望从服务器获取大量信息的情况下,此功能最有用。

例如,使用数据采集驱动程序,客户端可以指定4兆字节的数据区域并告诉驱动程序获取4兆字节的数据。驱动程序真的不需要像这样的大区域,以防万一有人要求进行大量的数据传输。

驱动程序可能具有128 KB的DMA数据传输区域,然后使用MsgWrite()将其逐个消息传递到客户端的地址空间(当然,每次增加128 KB的偏移量)。然后,当写入最后一条数据时,驱动程序将MsgReply()发送给客户端。

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

请注意,MsgWrite()允许您在各个位置编写数据组件,然后使用MsgReply()唤醒客户端:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

或者在客户端缓冲区开头写入标题后唤醒客户端:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

对于编写未知数量的数据,这是一个相当优雅的技巧,只有在编写完成后才能知道您编写了多少数据。如果您在传输数据后使用此方法编写标题,则必须记住在客户端数据区的开头留出标题空间!

Multipart messages

到目前为止,我们只展示了从客户端地址空间中的一个缓冲区到服务器地址空间中另一个缓冲区的消息传输。(在回复期间,服务器空间中的一个缓冲区进入客户端空间中的另一个缓冲区。)

虽然这种方法对于大多数应用程序来说足够好,但它可能导致效率低下。回想一下,我们的write()C库代码占用了传递给它的缓冲区,并在它的前面插入了一个小的头部信息。使用我们到目前为止学到的东西,你会期望C库实现类似这样的write()(以下不是真正的源代码):

ssize_t write (int fd, const void *buf, size_t nbytes)
{
char *newbuf;
io_write_t *wptr;
ssize_t nwritten;
newbuf = malloc (nbytes + sizeof (io_write_t));

 

// fill in the write_header at the beginning
wptr = (io_write_t *) newbuf;
wptr -> type = _IO_WRITE;
wptr -> nbytes = nbytes;

 

// store the actual data from the client
memcpy (newbuf + sizeof (io_write_t), buf, nbytes);

// send the message to the server
nwritten = MsgSend (fd, newbuf, nbytes + sizeof (io_write_t),

newbuf, sizeof (io_write_t));
free (newbuf);
return (nwritten);
}

看看发生了什么? 一些坏事:

  • write()现在必须能够为malloc()提供足够大的缓冲区,以便客户端数据(可能相当大)和标头。标头的大小不是问题 - 在这种情况下,它是12个字节。

  • 我们必须将数据复制两次:一次通过memcpy(),另一次为MsgSend。

  • 我们必须建立一个指向io_write_t类型的指针并将其指向缓冲区的开头,而不是本机访问它(这是一个小麻烦)。

由于内核无论如何都要复制数据,如果我们可以告诉它数据的一部分(标题)位于某个地址,而另一部分(数据本身)位于某处,那将是很好的。否则,我们无需手动组装缓冲区和复制数据。

幸运的是,QNX Neutrino实现了一种让我们做到这一点的机制!该机制称为IOV,代表“输入/输出向量”。

让我们先看看一些代码,然后我们将讨论会发生什么:

#include <sys/neutrino.h>
ssize_t write (int fd, const void *buf, size_t nbytes)
{
io_write_t whdr;
iov_t iov [2];
// set up the IOV to point to both parts:
SETIOV (iov + 0, &whdr, sizeof (whdr));
SETIOV (iov + 1, buf, nbytes);
// fill in the io_write_t at the beginning
whdr.type = _IO_WRITE;
whdr.nbytes = nbytes;
// send the message to the server
return (MsgSendv (coid, iov, 2, iov, 1));
}

首先,注意没有malloc()和memcpy()。接下来,请注意iov_t类型的使用。这是一个包含地址和长度对的结构,我们已经分配了两个(名为iov)。

iov_t类型定义由<sys / neutrino.h>自动包含,定义为:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

给定这种结构,我们用写头(第一部分)和来自客户端的数据(第二部分)填充地址和长度对。有一个名为SETIOV()的便利宏可以为我们完成任务。它被正式定义为:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

SETIOV()接受iov_t,并将地址和长度数据填充到IOV中。

另请注意,由于我们创建的IOV指向标头,因此我们可以在不使用malloc()的情况下在堆栈上分配标头。这可能是一个祝福和一个诅咒 - 当标题非常小时,这是一个祝福,因为你避免了动态内存分配的麻烦,但是当标题很大时它可能是一个诅咒,因为它可以消耗一大堆的堆栈空间。通常,头部非常小。

无论如何,重要的工作是由MsgSendv()完成的,它采用与我们在前一个例子中使用的MsgSend()函数几乎相同的参数:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

参数含义如下:

Coid

我们发送的连接ID,与MsgSend()一样。

sparts and rparts

iov_t参数指定的发送和接收部分的数量。在我们的示例中,我们将sparts设置为2表示我们正在发送一个由2部分组成的消息,并且rpart为1表示我们正在收到一个部分的回复。

siov and riov

iov_t数组指示我们希望发送的地址和长度对。在上面的例子中,我们设置2部分siov指向标题和客户端数据,1部分riov指向标题。

这是内核查看数据的方式:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

内核只是将数据从客户端空间中的IOV的每个部分无缝复制到服务器的空间(然后返回,用于回复)。实际上,内核正在执行聚集 - 分散操作。

要记住以下几点:

  • 部件数量“限制”为512 KB; 然而,我们的2的例子是典型的。

  • 内核只是将一个IOV中指定的数据从一个地址空间复制到另一个地址空间。

  • 源和目标IOV不必相同。

为什么最后一点如此重要?要回答这个问题,让我们来看看大局。在客户端,假设我们发布了:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在服务器端,(假设它是文件系统,fs-qnx6),我们有4 KB的缓存块,我们希望有效地将消息直接接收到缓存块中。 理想情况下,我们想写一些这样的代码:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这段代码完全符合您的期望:它设置了一个4部分的IOV结构,将结构的第一部分设置为指向标题,接下来的三部分指向缓存块37,16和22 。(这些数字代表恰好在特定时间可用的缓存块。)

这是一个图形表示:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

然后调用MsgReceivev()函数,指示我们将从指定的通道(chid参数)接收消息,并且我们将提供4部分IOV结构。 这也显示了IOV结构本身。

(除了它的IOV功能,MsgReceivev()就像MsgReceive()一样运行。)

哎呀!当我们引入MsgReceive()函数时,我们犯了同样的错误。在我们实际收到消息之前,我们如何知道我们收到的消息类型以及与之相关的数据量?

我们可以像以前一样解决这个问题:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这是初始的MsgReceive()(请注意,我们没有使用IOV表单 - 实际上没有必要使用单部分消息),找出它是什么类型的消息,然后继续阅读数据从客户端的地址空间(从偏移sizeof (header.io_write)处开始)进入由3部分IOV指定的缓存缓冲区。

请注意,我们从使用4部分IOV(在第一个示例中)切换到3部分IOV。那是因为在第一个例子中,4部分IOV的第一部分是标题,我们使用MsgReceive()直接读取,而4部分IOV的最后三部分与3部分IOV相同他们指定了我们希望数据的去向。

您可以想象我们如何执行读取请求的回复:

1.找到与请求的数据对应的缓存条目。

2.用这些条目填充IOV结构。

3.使用MsgWritev()(或MsgReplyv())将数据传输到客户端。

请注意,如果数据不是在高速缓存块(或其他数据结构)的开头就开始,则这不是问题。只需将第一个IOV偏移到指向数据开始的位置,然后修改大小。

What about the other versions?

除MsgSend *()系列之外的所有消息传递函数都具有相同的一般形式:如果函数的末尾有v,则需要IOV和部分数; 否则,它需要一个指针和一个长度。

MsgSend *()系列在消息缓冲区的源和目标方面有四个主要变化,并结合内核调用本身的两种变体。

看如下列表:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

“线性”是指传递void *类型的单个缓冲区及其长度。记住这个的简单方法是“v”代表“向量”,并且与适当的参数 - 第一个或第二个在同一个地方,分别指“发送”或“接收”。

嗯......看起来像MsgSendsv()和MsgSendsvnc()函数是相同的,不是吗?嗯,是的,就他们的参数而言,确实如此。不同之处在于它们是否是取消点。“nc”版本不是取消点,而非“nc”版本是。(有关取消点和可取消性的更多信息,请参阅pthread_cancel()下的QNX Neutrino C库参考。)

Implementation

您可能已经怀疑MsgRead(),MsgReceive(),MsgSend()和MsgWrite()函数的所有变体都是密切相关的。(唯一的例外是MsgReceivePulse()-我们很快就会看到这个。)

你应该使用哪些?嗯,这是一个哲学辩论。我个人的偏好是混合搭配。

如果我只发送或接收单部分消息,为什么还要为设置IOV的复杂性而烦恼呢?无论您是自己设置还是让内核/库执行它,设置它们的微小CPU开销基本相同。单部分消息方法使内核不必进行地址空间操作,并且速度稍快一些。

你应该使用IOV功能吗?绝对! 每当您发现自己处理多部分消息时,请使用它们。 当您只使用几行代码使用多部分message传输时,切勿复制数据。这可以通过最小化数据在系统中复制的次数来保持system screaming;传递指针比将数据复制到新缓冲区要快得多。

Pulses

到目前为止,我们所讨论的所有消息都为BLOCK状态的客户端。一旦调用MsgSend(),它就是客户端的休息时间。客户端休眠,直到服务器回复。

但是,有些情况下消息的发送者无法阻止。我们将看一下Interrupts和“Clocks, Timers, and Getting a Kick Every So Often”的章节中的一些示例,但是现在我们应该理解这个概念。

实现非阻塞发送的机制称为脉冲。脉冲是一条微小的信息:

•可以承载40位有效负载(8位代码和32位数据)
•对发件人没有阻止
•可以像任何其他消息一样接收
•如果接收器未被阻止等待,则排队

Receiving a pulse message

接收脉冲非常简单:MsgReceive()提供一个很小的,定义良好的消息,就像一个线程发送了一条正常的消息一样。唯一的区别是你不能MsgReply()这个消息 - 毕竟,脉冲的整个想法是它是异步的。在本节中,我们将看一下另一个函数MsgReceivePulse(),它对处理脉冲非常有用。

关于脉冲的唯一“有趣”的事情是从MsgReceive()函数返回的接收ID为零。这表明这是一个脉冲,而不是来自客户的常规消息。

您经常会看到服务器中的代码如下所示:

#include <sys/neutrino.h>
rcvid = MsgReceive (chid, …);
if (rcvid == 0) { // it's a pulse
// determine the type of pulse
// handle it
} else { // it's a regular message
// determine the type of message
// handle it
}

What's in a pulse?

好的,所以您收到此消息的接收ID为零。它实际上是什么样的?从<sys/neutrino.h>头文件中,这里是_pulse结构的定义:

struct _pulse {
uint16_t
type;
uint16_t
subtype;
int8_t
code;

uint8_t zero [3];
union sigval
value;
int32_t
scoid;
};

typesubtype成员都为零(进一步表明这是一个脉冲)。codevalue成员被设置为脉冲的发送者确定的任何内容。通常,code将指示脉冲发送的原因;该value将是与脉冲相关的32位数据值。这两个领域是“40位”内容的来源; 其他字段不是用户可调整的。

内核保留了code的负值情况,为程序员留下了127个value,因为他们认为合适。

value成员实际上是一个联合:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

因此(扩展上面的服务器示例),您经常会看到如下代码:

#include <sys/neutrino.h>
rcvid = MsgReceive (chid, …
if (rcvid == 0) { // it's a pulse
// determine the type of pulse
switch (msg.pulse.code) {
case MY_PULSE_TIMER:
// One of your timers went off, do something
// about it...
break;
case MY_PULSE_HWINT:
// A hardware interrupt service routine sent
// you a pulse. There's a value in the "value"
// member that you need to examine:
val = msg.pulse.value.sival_int;
// Do something about it...
break;
case _PULSE_CODE_UNBLOCK:
// A pulse from the kernel, indicating a client

// unblock was received, do something about it...
break;
// etc...
} else { // it's a regular message
// determine the type of message
// handle it
}

当然,这段代码假设你已经设置了你的msg结构来包含struct _pulse脉冲; 成员,并且定义了清单常量MY_PULSE_TIMER和MY_PULSE_HWINT。脉冲代码_PULSE_CODE_UNBLOCK是上述负编号内核脉冲之一。您可以在<sys/neutrino.h>中找到它们的完整列表以及值字段的简要说明。

 

The MsgReceivePulse() function

MsgReceive()和MsgReceivev()函数将接收“常规”消息或脉冲。可能存在您只想接收脉冲的情况。最好的例子是在服务器中,您收到客户端要求执行某项操作的请求,但尚未完成请求(可能您需要进行长时间的硬件操作)。在这样的设计中,您通常会设置硬件(或计时器或其他),以便在发生重大事件时向您发送脉冲。

如果您使用经典的“等待无限循环消息”设计来编写服务器,您可能遇到一个客户端向您发送请求的情况,然后,当您等待脉冲进入(表示请求完成)时,另一个客户端会向您发送另一个请求。通常,这正是您想要的 - 毕竟,您希望能够同时为多个客户端提供服务。但是,可能有充分的理由说明为什么这是不可接受的 - 为客户提供服务可能是如此资源密集,您希望限制客户端的数量。

在这种情况下,您现在需要能够“选择性地”仅接收脉冲,而不是常规消息。这是MsgReceivePulse()发挥作用的地方:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

如您所见,您使用与MsgReceive()相同的参数;通道ID,缓冲区(及其大小)以及info参数。(我们在“Who sent the message?”中讨论了上面的info参数。)注意,在脉冲的情况下不使用info参数;你可能会问为什么它出现在参数列表中。简单的回答:在实现中以这种方式更容易实现。只需传递一个NULL!

MsgReceivePulse()函数只接收脉冲。所以,如果你有一个通过MsgReceivePulse()阻塞了多个线程的通道,(并且没有通过MsgReceive()阻止它的线程),并且客户端试图向你的服务器发送一条消息,那么客户端将保持SEND-blocked,直到一个线程发出MsgReceive()调用。在此期间,脉冲将通过MsgReceivePulse()函数传输。

如果混合使用MsgReceivePulse()和MsgReceive(),唯一可以保证的是MsgReceivePulse()只能获得脉冲。MsgReceive()可以获取脉冲或消息!这是因为,通常情况下,MsgReceivePulse()函数的使用是为您要排除向服务器的常规消息传递的情况保留的。

这确实引起了一些混乱。由于MsgReceive()函数既可以接收消息又可以接收脉冲,但MsgReceivePulse()函数只能接收脉冲,那么如何处理使用这两种函数的服务器?通常,这里的答案是你有一个执行MsgReceive()的线程池。此线程池(一个或多个线程;数量取决于您准备同时服务的客户端数量)负责处理客户端调用(服务请求)。由于您正在尝试控制“提供服务的线程”的数量,并且由于其中一些线程可能需要阻塞,等待脉冲到达(例如,来自某些硬件或来自其他线程),因此您需要通常使用MsgReceivePulse()阻止提供服务的线程。这可以确保客户端请求在您等待脉冲时不会“潜入”(因为MsgReceivePulse()只接收脉冲)。

The MsgDeliverEvent() function

如上文“发送层次结构”中所述,有些情况下您需要打破发送的自然流。

如果您的客户端向服务器发送了消息,结果可能暂时不可用,并且客户端不想处于Block,则可能会出现这种情况。当然,你也可以通过线程来部分地解决这个问题,让客户端在阻塞服务器调用上简单地“use up”一个线程,但是这对于较大的系统来说可能无法很好地扩展(你可能会使用很多线程来等待许多不同的服务器)。假设您不想使用线程,而是希望服务器立即回复客户端,“我很快就会回复您的请求。”此时,由于服务器回复,客户端现在将会继续处理。一旦服务器完成了客户端给出的任何任务,服务器现在需要某种方式告诉客户端,“嘿,醒来,我已经完成了。”显然,正如我们在上面的发送层次结构讨论中看到的那样,你可以没有服务器向客户端发送消息,因为如果客户端在同一时刻向服务器发送消息,这可能会导致死锁。那么,服务器如何在不违反发送层次结构的情况下向客户端“发送”消息?

它实际上是一个多步操作。以下是它的工作原理:

  1. 客户端创建一个struct sigevent结构,并将其填入。

  2. 客户端向服务器发送消息,有效地说:“为我执行此特定任务,立即回复,顺便说一句,这是一个结构设置,您应该在工作完成时通知我。”

  3. 服务器接收消息(包括struct sigevent),将struct sigevent和接收ID存储起来,并立即回复给客户端。

  4. 客户端现在正在运行,服务器也是如此。

  5. 当服务器完成工作时,服务器使用MsgDeliverEvent()通知客户端工作现在已完成。

我们将在“How to fill in the struct sigevent.”中详细介绍Clocks,Timers和Getting a Kick Every Sounds中的struct sigevent。现在,只需将struct sigevent视为“ 黑盒子“以某种方式包含服务器用来通知客户端的事件。由于服务器存储了struct sigevent和来自客户端的接收ID,因此服务器现在可以调用MsgDeliverEvent()将客户端选择的事件传递给客户端:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

请注意,MsgDeliverEvent()函数接受两个参数,即接收ID(在rcvid中)和要在事件中传递的事件。服务器不会以任何方式修改或检查事件!这一点很重要,因为它允许服务器提供客户端选择的任何类型的事件,而无需在服务器上进行任何特定处理。 (但是,服务器可以使用MsgVerifyEvent()函数验证事件是否有效。)

rcvid是服务器从客户端获取的接收ID。请注意,这确实是一个特例。通常,在服务器回复客户端之后,接收ID不再具有任何含义(原因是客户端未被阻止,服务器无法再次解锁,或者从客户端读取数据或向客户端写入数据,等等。)。但在这种情况下,接收ID包含足够的信息,以便内核能够决定事件应该传递到哪个客户端。当服务器调用MsgDeliverEvent()函数时,服务器不会阻塞 - 这是对服务器的非阻塞调用。客户端将事件(由内核)传递给它,然后可以执行适当的任何操作。

Channel flags

当我们介绍服务器(在“服务器”中)时,我们提到ChannelCreate()函数接受一个flags参数,我们只是将它保留为零。

现在是时候解释flags了。我们只检查一些可能的标志值:

_NTO_CHF_FIXED_PRIORITY

接收线程不会根据发送方的优先级更改优先级。(我们在下面的“Priority inheritance”部分中详细讨论优先级问题)。通常(即,如果您未指定此标志),接收线程的优先级将更改为发送方的优先级。

_NTO_CHF_UNBLOCK

每当客户端线程尝试解除阻塞时,内核都会发送一个脉冲。服务器必须回复客户端才能允许客户端取消阻止。我们将在下面讨论这个,因为它对客户端和服务器都有一些非常有趣的结果。

_NTO_CHF_THREAD_DEATH

只要在此通道上阻塞的线程死亡,内核就会发出脉冲。这对于希望始终为服务请求维护固定“线程池”的服务器非常有用。

_NTO_CHF_DISCONNECT

只要来自单个客户端的所有连接都与服务器断开连接,内核就会发送脉冲。

_NTO_CHF_SENDER_LEN

内核提供客户端的消息大小作为提供给服务器的信息的一部分(struct _msg_info结构的srcmsglen成员)。

_NTO_CHF_REPLY_LEN

内核提供客户端的回复消息缓冲区大小,作为提供给服务器的信息的一部分(struct _msg_info结构的dstmsglen成员)。

_NTO_CHF_COID_DISCONNECT

每当该进程拥有的任何连接因另一端的通道消失而终止时,内核就会发送一个脉冲。

_NTO_CHF_UNBLOCK

我们来看看_NTO_CHF_UNBLOCK标志;它对客户端和服务器都有一些有趣的褶皱。

通常(即,当服务器未指定_NTO_CHF_UNBLOCK标志的情况下)当客户端希望从MsgSend()(以及相关的MsgSendv(),MsgSendvs()等函数族)解锁时,客户端简单地解除阻塞。客户端可能希望由于接收到信号或内核超时而解除阻塞(请参阅QNX Neutrino C库参考中的TimerTimeout()函数,以及“Clocks, Timers,
and Getting a Kick Every So Often
”章节)。不幸的是,服务器不知道客户端已解除阻止,不再等待回复。请注意,除非在非常特殊的情况下需要服务器与其所有客户端之间的协作,否则无法写入关闭此标志的可靠服务器。

假设您有一个具有多个线程的服务器,所有线程都在服务器的MsgReceive()函数中被阻止。客户端向服务器发送消息,服务器的一个线程接收它。此时,客户端被阻止,服务器中的线程正在主动处理请求。现在,在服务器线程有机会回复客户端之前,客户端从MsgSend()解除阻塞(让我们假设它是由于信号)。

请记住,服务器线程仍在代表客户端处理请求。但由于客户端现在已被解除阻塞(客户端的MsgSend()将返回EINTR),客户端可以*地向服务器发送另一个请求。由于QNX Neutrino服务器的架构,另一个线程将从客户端收到另一条消息,具有完全相同的接收ID!服务器无法区分这两个请求!当第一个线程完成并回复客户端时,它实际上是回复客户端发送的第二条消息,而不是第一条消息(因为线程实际上认为它正在做)。因此,服务器的第一个线程回复客户端的第二条消息。

这很糟糕;但让我们更进一步。现在服务器的第二个线程完成请求并尝试回复客户端。但是由于服务器的第一个线程已经回复到客户端,因此客户端现在被解除阻塞,服务器的第二个线程从其回复中收到错误。

此问题仅限于多线程服务器,因为在单线程服务器中,服务器线程仍将忙于处理客户端的第一个请求。这意味着即使客户端现在已解除阻止并再次发送到服务器,客户端现在将进入SEND阻塞状态(而不是REPLY-blocked状态),允许服务器完成处理,回复客户端(这将导致错误,因为客户端不再被REPLY阻止),然后服务器将从客户端接收第二条消息。这里真正的问题是服务器代表客户端执行无用的处理(客户端的第一个请求)。处理没用,因为客户端不再等待该工作的结果。

解决方案(在多线程服务器的情况下)是让服务器为其ChannelCreate()调用指定_NTO_CHF_UNBLOCK标志。这告诉内核,“告诉我客户端何时尝试解锁我(通过向我发送脉冲),但不要让客户端解锁! 我会自己解锁客户端。“

要记住的关键是这个服务器标志通过不允许客户端解除阻塞来改变客户端的行为,直到服务器说它可以这样做。

在单线程服务器中,会发生以下情况:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这没有帮助客户端解锁它,但它确实确保服务器不会混淆。在这种示例中,服务器很可能只是忽略它从内核获得的脉冲。这样做是可以的 - 这里的假设是让客户端阻塞直到服务器准备好数据是安全的。

如果您希望服务器根据内核发送的脉冲进行操作,有两种方法可以执行此操作:

  • 在服务器中创建另一个侦听消息的线程(特别是侦听来自内核的脉冲)。第二个线程将负责取消第一个线程中正在进行的操作。两个线程中的一个将回复客户端。

  • 不要在线程本身中执行客户端的工作,而是排队工作。这通常在服务器将客户端的工作存储在队列中并且服务器是事件驱动的应用程序中完成。通常,到达服务器的消息之一表示客户端的工作现在已完成,服务器应该回复。在这种情况下,当内核脉冲到达时,服务器代表客户端取消正在执行的工作并回复。

您选择哪种方法取决于服务器的工作类型。在第一种情况下,服务器代表客户端主动执行工作,所以你真的没有选择 - 你必须有一个第二个线程来监听来自内核的unblock-pulse(或者你可以在线程内定期轮询以查看脉冲是否已到达,但通常不鼓励轮询)。

在第二种情况下,服务器还有其他工作要做 - 可能已经命令一块硬件“去收集数据。”在这种情况下,无论如何,服务器的线程将被阻塞在MsgReceive()函数上,等待硬件指示命令已完成。

在任何一种情况下,服务器都必须回复客户端,否则客户端将保持阻止状态。

Synchronization problem

即使您如上所述使用_NTO_CHF_UNBLOCK标志,仍然需要处理一个同步问题。 假设您在MsgReceive()函数上阻塞了多个服务器线程,等待消息或脉冲,客户端会向您发送消息。一个线程关闭并开始客户端的工作。当发生这种情况时,客户端希望解除阻塞,因此内核会生成解除阻塞脉冲。服务器中的另一个线程接收此脉冲。此时,存在竞争条件 - 第一个线程可能正准备回复客户端。如果第二个线程(获得脉冲)做了回复,那么客户端有可能解除阻塞并向服务器发送另一条消息,服务器的第一个线程现在有机会运行并使用第一个请求的数据回复客户端的第二个请求:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

或者,如果获得脉冲的线程即将回复客户端,并且第一个线程做出回复,那么您具有相同的情况 - 第一个线程解除阻塞客户端,谁发送另一个请求,以及第二个线程( 获得脉冲)现在取消阻止客户端的第二个请求。

情况是你有两个并行的执行流程(一个由消息引起,一个由脉冲引起)。通常,我们会立即将此视为需要互斥锁的情况。不幸的是,这会导致问题 - 必须在MsgReceive()之后立即获取互斥锁并在MsgReply()之前释放互斥锁。虽然这确实有效,但它会破坏解锁脉冲的全部目的!(服务器要么得到消息并忽略解除阻塞脉冲,直到它回复客户端,否则服务器将获得解除阻塞脉冲并取消客户端的第二次操作。)

一个看起来很有希望(但最终注定要失败)的解决方案是拥有一个细粒度的互斥体。 我的意思是一个互斥锁,只能在控制流的一小部分周围锁定和解锁(你应该使用互斥锁的方式,而不是像上面提到的那样阻塞整个处理部分)。您在服务器中设置了“我们已经回复了吗?”标志,当您收到消息时,此标志将被清除,并在您回复消息时进行设置。 就在你回复邮件之前,你要检查标志。如果该标志指示该消息已被回复,则您将跳过回复。 在检查和设置标志时,互斥锁将被锁定和解锁。

不幸的是,这不起作用,因为我们并不总是处理两个并行的执行流程 - 客户端在处理期间不会总是被信号击中(导致解除阻塞脉冲)。这是它破坏的场景:

  • 客户端向服务器发送消息;客户端现在被阻止,服务器现在正在运行。

  • 由于服务器收到来自客户端的请求,标志将重置为0,表示我们仍需要回复客户端。

  • 服务器正常回复客户端(因为标志设置为0)并将标志设置为1,表示如果unblock-pulse到达,则应忽略该标志。

  • (问题从这里开始。)客户端向服务器发送第二条消息,几乎在发送后立即被信号命中; 内核向服务器发送一个unblock-pulse。

  • 接收消息的服务器线程即将获取互斥锁以检查标志,但是没有完全到达(它被抢占)。

  • 另一个服务器线程现在获取脉冲,因为该标志仍然从上次设置为1,忽略脉冲。

  • 现在服务器的第一个线程获取互斥锁并清除标志。

  • 此时,unblock事件已丢失。

如果您优化标志以指示更多状态(例如接收脉冲,回复脉冲,收到消息,回复消息),您仍将遇到同步竞争条件,因为您无法在两者之间创建原子绑定 标志和接收和回复函数调用。(从根本上说,这就是问题所在 - 在MsgReceive()之后和调整标志之前的小时间窗口,以及在MsgReply()之前调整标志之后。)解决这个问题的唯一方法就是让 内核跟踪你的标志。

Using the _NTO_MI_UNBLOCK_REQ

幸运的是,内核会在消息信息结构中将您的标志作为单个位跟踪(您作为MsgReceive()的最后一个参数传递的struct _msg_info,或者您可以在给定接收ID后通过调用获取MsgInfo())。

标志被称为_NTO_MI_UNBLOCK_REQ,并且如果客户端希望解除阻塞(例如,在接收到信号之后)则设置该标志。

这意味着在多线程服务器中,您通常会有一个正在执行客户端工作的“工作”线程,以及另一个将接收解除阻塞消息的线程(或其他一些消息;我们只关注取消阻止消息 现在)。 当您从客户端获得解除阻止消息时,您将为自己设置一个标志,让您的程序知道线程希望解除阻止。

有两种情况需要考虑:

•“工人”线程被阻止; 要么
•“worker”线程正在运行。

如果工作线程被阻止,则需要让获取unblock消息的线程唤醒它。例如,如果它正在等待资源,它可能会被阻止。当工作线程唤醒时,它应检查_NTO_MI_UNBLOCK_REQ标志,如果设置,则以中止状态回复。如果未设置标志,则线程可以执行唤醒时执行的任何正常处理。

或者,如果工作线程正在运行,它应该定期检查解除阻塞线程可能设置的“自我标记”,如果设置了标志,它应该以中止状态回复客户端。请注意,这只是一个优化:在未优化的情况下,工作线程将不断调用接收ID上的“MsgInfo”并检查_NTO_MI_UNBLOCK_REQ位本身。

Message passing over a network

为了清楚起见,我已经避免谈论你如何使用网络上的消息传递,尽管这是QNX Neutrino灵活性的关键部分!

到目前为止,您学到的所有内容都适用于通过网络传递的消息。
在本章的前面,我向您展示了一个例子:

#include <fcntl.h>
#include <unistd.h>
int main (void)
{
int fd;
fd = open ("/net/wintermute/home/rk/filename", O_WRONLY);
write (fd, "This is message passing\n", 24);
close (fd);
return (EXIT_SUCCESS);
}

当时,我说这是“使用通过网络传递消息的示例。”客户端创建与ND/PID/CHID(恰好位于不同节点上)的连接,并且服务器执行 MsgReceive()在其频道上。在这种情况下,客户端和服务器与本地单节点情况相同。你可以在这里停止阅读 - 关于通过网络传递的消息真的没有任何“棘手”。但对于那些对此如何感到好奇的读者,请继续阅读!

现在我们已经看到了本地消息传递的一些细节,我们可以更深入地讨论通过网络传递的消息是如何工作的。虽然这个讨论可能看起来很复杂,但它实际上归结为两个阶段:名称解析,一旦得到解决,简单的消息传递。

这是一个图表,说明了我们将要讨论的步骤:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在图中,我们的节点称为magenta,并且,如示例所暗示的,目标节点称为wintermute。
让我们分析客户端程序使用Qnet通过网络访问服务器时发生的交互:

  1. 客户端的open()函数被告知打开一个恰好在其前面有/net的文件名。(名称/net是Qnet显示的默认名称。)此客户端不知道谁负责该特定路径名,因此它连接到process manager(步骤1)以找出实际拥有该资源的人员。无论我们是通过网络传递消息还是自动发生,都可以完成此操作。由于本地QNX Neutrino网络管理器Qnet“拥有”以/net开头的所有路径名,因此流程管理器将信息返回给客户端它向Qnet询问路径名。

  2. 客户端现在向Qnet的资源管理器线程发送消息,希望Qnet能够处理该请求。但是,此节点上的Qnet不负责提供客户端所需的最终服务,因此它告诉客户端它应该实际联系节点wintermute上的进程管理器。(这样做的方法是通过“重定向”响应,它为客户端提供应该联系的服务器的ND/PID/CHID。)此重定向响应也由客户端库自动处理。

  3. 客户端现在连接到wintermute上的process manager。这涉及通过Qnet的网络处理程序线程发送节点外消息。客户端节点上的Qnet进程获取消息并通过介质将其传输到远程Qnet,远程Qnet将其传递给wintermute上的process manager。那里的process manager解析了路径名的其余部分(在我们的例子中,那将是“/home/rk/filename”部分)并发回一个重定向消息。此重定向消息遵循相反的路径(从服务器的Qnet通过介质到客户端节点上的Qnet,最后返回到客户端)。此重定向消息现在包含客户端首先要联系的服务器的位置,即服务于客户端请求的服务器的ND/PID/CHID。(在我们的示例中,服务器是一个文件系统。)

  4. 客户端现在将请求发送到该服务器。此处遵循的路径与上面步骤3中遵循的路径相同,只是直接联系服务器而不是通过流程管理器。

一旦建立了步骤1到3,步骤4就是所有未来通信的模型。在上面的客户端示例中,open(),read()和close()消息都采用路径编号4。请注意,客户端的open()是触发这一系列事件的原因 - 但实际上是 如所描述的那样打开消息流(通过路径号4)。

对于真正感兴趣的读者:我遗漏了一步。在第2步中,当客户向Qnet询问wintermute时,Qnet需要弄清楚wintermute是谁。这可能导致Qnet再执行一次网络事务来解析节点名称。如果我们假设Qnet已经知道wintermute,那么上面给出的图是正确的。

Networked message passing differences

因此,一旦建立连接,所有进一步的消息传递流程都使用上图中的步骤4。这可能会导致您错误地认为通过网络传递的消息与本地情况下传递的消息相同。不幸的是,事实并非如此。以下是不同之处:

  • 延误时间更长

  • 无论节点是否处于活动状态,ConnectAttach()都会返回成功 - 在第一个消息传递时发生实际错误指示。

  • MsgDeliverEvent()不保证可靠

  • MsgReply(),MsgRead(),MsgWrite()现在处于blocking calls,而在本地情况下它们不是。

  • MsgReceive()可能无法接收客户端发送的所有数据;服务器可能需要调用MsgRead()来完成剩下的工作。

Longer delays

由于消息传递现在通过某种介质完成,而不是直接由内核控制的内存到内存副本,因此您可以预期传输消息所需的时间将显着增加(100 MB以太网与100 MHz 64- 位宽DRAM将是一个或两个数量级更慢的速度)。此外,最重要的是协议开销(最小)和有损网络上的重试。

Impact on ConnectAttach()

当您调用ConnectAttach()时,您将指定ND,PID和CHID。在QNX Neutrino中发生的所有事情是内核将连接ID返回到上图中所示的Qnet“网络处理程序”线程。由于没有发送任何消息,因此您不会被告知您刚刚连接的节点是否仍然存在。在正常使用中,这不是问题,因为大多数客户端不会自己做ConnectAttach() - 相反,他们将使用库调用open()的服务,它执行ConnectAttach(),然后 乎立即发出“open”的消息。这具有几乎立即指示远程节点是否存活的效果。

Impact on MsgDeliverEvent()

当服务器在本地调用MsgDeliverEvent()时,内核有责任将事件传递给目标线程。 在网络方式上,服务器仍然调用MsgDeliverEvent(),但内核将该事件的“代理”传递给Qnet,由Qnet将代理交付给另一个(客户端)Qnet,然后他们将交付实际的事件发生在客户端。事情可能会在服务器端搞砸,因为MsgDeliverEvent()函数调用是非阻塞的 - 这意味着一旦服务器调用了MsgDeliverEvent(),它就会运行。现在转过来说“我讨厌告诉你这件事,但你知道我说的MsgDeliverEvent()成功了吗?为时已晚,好吧,它没有!”

Impact on MsgReply(), MsgRead(), and MsgWrite()

为了防止MsgDeliverEvent()与MsgReply(),MsgRead()和MsgWrite()一起提到的问题,这些函数在网络上使用时转换为阻塞调用。在本地,他们只是简单地传输数据并立即解锁。在网络上,我们必须(在MsgReply()的情况下)确保数据已经传送到客户端或(在其他两个的情况下)通过网络实际传输数据到客户端或从客户端传输数据。

Impact on MsgReceive()

最后,MsgReceive()也受到影响(在网络情况下)。当服务器的MsgReceive()取消阻塞时,Qnet并非所有客户端的数据都可以通过网络传输。这是出于性能原因而完成的。

struct _msg_info中有两个标志作为MsgReceive()的最后一个参数传递(我们在上面的“Who sent the message?”中详细介绍了这个结构):

msglen

指示MsgReceive()实际传输了多少数据(Qnet喜欢传输8 KB)。

Srcmsglen

指示客户端要传输的数据量(由客户端确定)。

因此,如果客户端希望通过网络传输1兆字节的数据,则服务器的MsgReceive()将解除阻塞,并且msglen将设置为8192(表示缓冲区中有8192个字节可用),而srcmsglen将设置为1048576(表明客户端试图发送1兆字节)。

然后,服务器使用MsgRead()从客户端的地址空间获取其余数据。

Some notes on NDs

关于消息传递,我们尚未谈到的另一个“有趣”的事情是“节点描述符”或简称“ND”的整个业务。

回想一下,我们在示例中使用了符号节点名称,例如/net/wintermute。在QNX 4(QNX Neutrino之前的操作系统的先前版本)下,本机网络基于节点ID的概念,节点ID是网络上唯一的小整数。因此,我们将讨论“节点61”或“节点1”,这反映在函数调用中。

在QNX Neutrino下,所有节点都在内部以32位数量引用,但它不是网络唯一的! 我的意思是,wintermute可能会将spud视为节点描述符编号“7”,而spud可能会将magenta视为节点描述符编号“7”。让我扩展一下,为您提供更好的图片。此表显示了三个节点(wintermute,spud和foobar)可能使用的一些示例节点描述符:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

注意每个节点的节点描述符本身是如何为零。还要注意wintermute的spud节点描述符是如何“7”,foobar的spud节点描述符也是如此。但是,foobar的wintermute节点描述符是“4”,而foobar的spud节点描述符是“6”。正如我所说,它们在网络中并不是唯一的,尽管它们在每个节点上都是唯一的。您可以有效地将它们视为文件描述符 - 如果两个进程访问同一个文件,它们可能具有相同的文件描述符,但它们可能不会;它取决于谁在何时打开哪个文件。

幸运的是,您不必担心节点描述符,原因如下:

  1. 您通常会执行的大多数节点外消息传递将通过更高级别的函数调用(例如open(),如上例所示)。

  2. 节点描述符不被缓存 - 如果你得到它,你应该立即使用它然后忘记它。

  3. 有一些库调用将路径名(如/net/magenta)转换为节点描述符。

要使用节点描述符,您需要包含文件<sys/netmgr.h>,因为它包含一堆netmgr _ *()函数。

您可以使用函数netmgr_strtond()将字符串转换为节点描述符。一旦有了这个节点描述符,就可以在ConnectAttach()函数调用中立即使用它。具体来说,您不应该将其缓存在数据结构中!原因是一旦与该特定节点的所有连接断开,本机网络管理器可以决定重用它。因此,如果/net/magenta的节点描述符为“7”,并且连接到它,发送消息,然后断开连接,则本机网络管理器可能会再次返回节点描述符“7”对于不同的节点。

由于节点描述符并不是每个网络唯一的,因此出现的问题是“你如何在网络中传递这些东西?”显然,magenta对节点描述符“7”的看法将与wintermute完全不同。这里有两种解决方案:

  • 不要传递节点描述符;请改用符号名称(例如/net/wintermute)。

  • 使用netmgr_remote_nd()函数。

首先是一个很好的通用解决方案。第二种解决方案使用起来相当简单:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

此函数有两个参数:remote_nd是目标机器的节点描述符,local_nd是要转换为远程机器视点的节点描述符(从本地机器的角度来看)。结果是从远程计算机的角度来看有效的节点描述符。

例如,假设wintermute是我们的本地机器。我们有一个节点描述符“7”,它在我们的本地机器上有效并指向magenta。我们想知道的是节点描述符magenta用来与我们交谈的内容:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这可能会打印类似于:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这表示在magenta上,节点描述符“4”指的是我们的节点。(注意使用特殊常量ND_LOCAL_NODE,它实际上为零,表示“此节点。”)

现在,回想一下我们说(在Who sent the message?”中)struct _msg_info包含两个节点描述符:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

我们在说明中对这两个字段说:

  • nd是发送节点的接收节点的节点描述符

  • srcnd是接收节点的发送节点的节点描述符

因此,对于上面的示例,其中wintermute是本地节点,magenta是远程节点,当magenta向我们发送消息(wintermute)时,我们期望:

  • nd将包含7

  • srcnd将包含4

Priority inheritance

实时操作系统中的一个有趣问题是称为优先级倒置的现象。

优先级倒置表现为,例如,低优先级线程消耗所有可用的CPU时间,即使优先级较高的线程已准备好运行。

现在你可能在想,“等一下!你说优先级较高的线程总是抢占优先级较低的线程!怎么会这样?”

这是真的;优先级较高的线程将始终抢占较低优先级的线程。但有趣的事情可能发生。让我们看一下我们有三个线程的场景(在三个不同的进程中,为了简单起见),“L”是我们的低优先级线程,“H”是我们的高优先级线程,“S”是服务器。

此图显示了三个线程及其优先级:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

目前,H正在运行。S,优先级较高的服务器线程,现在没有任何事情可做,所以它正在等待消息并在MsgReceive()中被阻止。L想要运行,但其优先级低于运行时的H。一切都如你所料,对吧?

现在H决定它要睡100毫秒 - 也许它需要等待一些缓慢的硬件。此时,L正在运行。

这是事情变得有趣的地方。

作为其正常操作的一部分,L向服务器线程S发送消息,导致S进入READY并且(因为它是READY的最高优先级线程)开始运行。不幸的是,L发送给S的消息是“计算到小数点后5000位”。

显然,这需要超过100毫秒。因此,当H的100毫秒达到并且H变为READY时,猜猜是什么?它不会运行,因为S是READY并且优先级更高!

发生的事情是,低优先级线程通过优先级更高的线程利用CPU来防止更高优先级的线程运行。这是优先倒置。

要解决它,我们需要讨论优先级继承。一个简单的解决方法是让服务器S继承客户端线程的优先级:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

在这种情况下,当H的100毫秒睡眠完成时,它会进入READY状态,因为它是最高优先级的READY线程,所以它会运行。

不错,但还有一个“陷阱”。

假设H现在决定它也想进行计算。它想要计算第5034个素数,因此它向S发送消息,并进入block状态。

但是,S仍在计算pi,优先级为5!在我们的示例系统中,有很多其他线程在优先级高于5的情况下运行,它们正在利用CPU,有效地确保了S没有太多时间来计算pi。

这是优先级倒置的另一种形式。在这种情况下,优先级较低的线程阻止了更高优先级的线程访问资源。将此与优先级倒置的第一种形式进行对比,其中优先级较低的线程实际上消耗了CPU - 在这种情况下,它只是阻止了更高优先级的线程获取CPU - 它不消耗任何CPU本身。

幸运的是,这里的解决方案也相当简单。将服务器的优先级提升为所有被阻止客户端中的最高者:

Getting_Started_with_QNX_Neutrino -- Chapter 2:Message Passing

这样我们就可以让L的工作优先于L,但我们确保H在CPU上获得了一个公平的**。

So what's the trick?

没有诀窍!QNX Neutrino会自动为您完成此操作。(如果不需要,可以关闭优先级继承;请参阅ChannelCreate()函数文档中的_NTO_CHF_FIXED_PRIORITY标志。)

但是,这里有一个小的设计问题。你如何将优先级恢复到改变之前的状态?

您的服务器正在运行,为来自客户端的请求提供服务,并在从MsgReceive()调用中解除阻塞时自动调整其优先级。但什么时候应该将其优先级调整回MsgReceive()调用之前的优先级呢?

有两种情况需要考虑:

  • 服务器在正确地为客户端提供服务后执行一些额外的处理。这应该在服务器的优先级而不是客户端完成。

  • 服务器立即执行另一个MsgReceive()来处理下一个客户端请求。

在第一种情况下,当服务器不再为该客户端工作时,服务器以客户端的优先级运行是不正确的!解决方案非常简单。使用pthread_setschedparam()或pthread_setschedprio()函数(在“Processes and Threads”一章中讨论)将优先级恢复为应该的优先级。那另一个案子怎么样?答案很简单:谁在乎呢?

想一想。如果服务器在优先级为29时变为RECEIVE阻塞而与优先级为2时有何不同?问题的事实是它被RECEIVE阻止了!它没有获得任何CPU时间,因此它的优先级无关紧要。一旦MsgReceive()函数解除对服务器的阻塞,服务器就会继承(新)客户端的优先级,一切都按预期工作。

Summary

消息传递是一个非常强大的概念,是构建QNX Neutrino(实际上是所有过去的QNX操作系统)的主要功能之一。

通过消息传递,客户端和服务器交换消息(同一进程中的线程到线程,同一节点上不同进程中的线程到线程,或者网络中不同节点上的不同进程中的线程到线程))。客户端发送消息并阻塞,直到服务器收到消息,处理消息并回复客户端。

消息传递的主要优点是:

  • 消息内容不会根据目标位置(本地与网络)发生变化。

  • 消息为客户端和服务器提供“干净”的解耦点。

  • 隐式同步和序列化有助于简化应用程序的设计。

原网页:Message Passing