Erlang 源码分析之 Gen_Server

众所周知 Erlang/OTP 是一个具有高并发、低延时、高容错等特性的平台。

其三大 Behaviour :

  • Gen_ Server(客户端服务器);
  • Gen_ Fsm(有限状态机);
  • Gen_ Event(事件通知),是其坚如磐石的系统的基石。

本场 Chat 将透过源码分析其中 Gen_Server 的实现原理并管中一窥 Erlang 的设计思想。

本场 Chat 主要内容:

  1. Erlang 异步编程背景知识。
  2. 异步编程模型。
  3. 阅读 Gen_Server 源码(如何使用、源码分析、代码调试)。
  4. 总结。

一、简介

众所周知,Erlang/OTP 是一个具有高并发、低延时、高容错等等特性的平台,其三大 Behaviour:

  • Gen_Server(客户端服务器)
  • Gen_Fsm(有限状态机)
  • Gen_Event(事件通知)

是其坚如磐石的系统的基石。本章节将透过源码分析其中 Gen_Server 的实现原理并管中一窥 Erlang 的设计思想。

二、背景知识

在开始介绍 Gen_Server 实现原理之前,需要先介绍一下 Erlang 异步编程的思想与基础。Erlang 异步编程的基础就是 Erlang 进程和异步消息投递机制。

2.1 异步编程基础

Erlang进程:

这里的 Erlang 进程并不是系统进程,而是通过 ErlangVM 实现创建的,一个 Erlang 进程大概占用 300 个字的内存空间,创建时间只有几微秒。ErlangVM 会为每个核启动一个线程充当调度器,调度器分配名下进程队列中的所有进程抢占运行时间片来达到并发执行的目的。在 ErlangVM 中,所有的 Erlang 代码都是运行在 Erlang 进程里面的。派生一个进程很简单,通过 spawn 函数,指定一个新进程运行的函数即可。

API 如下:

spawn(Fun) -> pid()spawn(Node, Fun) -> pid()spawn(Module, Function, Args) -> pid()

以匿名函数为例,在 Erlang shell 中运行:

spawn(fun() -> io:format(erlang:group_leader(),"hello world~n",[]),end).

屏幕中打印出

hello world<0.66.0>

异步消息投递机制:

光有并行运行的 Erlang 进程是不够的,Erlang 通过异步消息来实现跨进程的同步与异步操作的。

Erlang 为每一个进程都配备了一个信箱,并且提供了发送消息和接受消息的命令。

发送消息的命令:

Pid ! Msg.

Pid 是目标进程,! 是发送消息的命令符,Msg 是消息体。

发送消息是异步执行的,发送消息后,本进程立即执行后续代码,消息投递到目标进程时会插入目标进程的信箱队列中,直到目标进程读取消息。

          receive            Msg ->                 do_something()          after            Timeout ->                do_finish()          end

以上代码就是目标进程处理消息的代码示例。receive 命令就是读取并接收消息的命令。这个 receive 命令有两层含义,按照信箱队列先进先出的原则读取一条消息;如果信箱是空的则等待信箱接收到消息为止,这是一个阻塞操作,会使进程主动交出运行时间片,直到信箱收到消息为止才会重新接受调度器分配的新时间片来继续运行,这个阻塞等待可以设置超时时间,超时后继续运行后续代码,不设置则一直等待收到消息为止。

2.2 异步编程模型

Erlang 异步编程最常见的模型就是异步应答式模型,这个模型就是建立在前面介绍的基础上的。通过派生的进程,接收消息并作出应答,并且通过这个模型既可以实现异步的逻辑,也可以实现同步的逻辑。

见代码:

    %%demo模块    -module(demo_server).    -define(SERVER, demo_server).    -export([start_server/0, loop/1]).    -export([get/1, set/2, stop/0]).    start_server() ->        Pid = spawn_link(?MODULE, loop, [[]]),        erlang:register(?SERVER, Pid),        ok.    loop(Data) ->        receive            {call, From, {get, Key}} ->                Value = proplists:get_value(Key,Data),                From ! {reply, ?SERVER, Value},                loop(Data);            {cast, {set, Key ,Value}} ->                NewData = [{Key, Value} | proplists:delete(Key,Data)],                loop(NewData);            stop ->                ok        end.    get(Key) ->        ?SERVER ! {call, self(), {get, Key}},        receive            {reply, ?SERVER, Value} ->                Value        after             5000 ->                undefined        end.    set(Key,Value) ->        ?SERVER ! {cast, {set, Key, Value}},        ok.    stop() ->        ?SERVER ! stop.

测试执行:

    demo_server:start_server().    打印    ok    demo_server:get(a).    打印    undefined    demo_server:set(a,b).    打印    ok    demo_server:get(a).    打印    b    demo_server:stop().    打印    stop

在上述代码中,存在两个进程,一个是调用方进程,这里是 Erlang shell 的进程,一个是 start_server 函数派生出的进程,我们把调用方进程叫做客户端进程,响应方进程叫做服务端进程。

服务端进程执行的函数是 loop 函数,服务端进程启动就通过 receive 阻塞住,等待接收匹配的消息,除了 stop 消息外,其他消息都会执行尾递归调用 loop 函数,这就是一个极简单服务端模型。

其他的两个消息,我们加了两个标签,一个 call,一个 cast,这两个不同标签的消息,我们实现了该模型下的同步调用,和异步调用。异步调用比较简单,调用方只需要投递消息就不用管了,目标进程收到消息,与调用方进程并发运行,执行处理消息的流程;同步调用需要调用方进行配合,投递消息后需要 receive 阻塞住,等待目标进程收到消息,处理消息,并向调用方投递结果,调用方收到结果再继续执行后续逻辑。

有人问,为什么要用异步消息投递的方式来执行同步逻辑呢?我觉得这样做可以使服务端进程执行高效并且有序(因为信箱是有序的)。

三、阅读 Gen_Server 源码

通过前面的背景知识,我们了解了 Gen_Server 的设计模型,以及 Erlang 的异步编程思想,我们就能够把握 Gen_Server 的设计走向。

3.1 如何使用

在我们阅读分析代码之前,首先我们得需要知道如何使用 Gen_Server,创建一个通用服务器程序。上代码:

    -module(mock_server).    -behaviour(gen_server).    -define(SERVER, mock_server).    -export([start_link/0]).    -export([get/1,set/2,stop/0]).    -export([init/1,handle_call/3,handle_cast/2,handle_info/2]).    start_link() ->        gen_server:start_link({local,?SERVER}, ?MODULE, [], []).    init(Data) ->        {ok, Data}.    get(Key) ->        gen_server:call(?SERVER,{get, Key}).    set(Key,Value) ->        gen_server:cast(?SERVER, {set, Key, Value}).    stop() ->        gen_server:stop(?SERVER).    handle_call({get,Key}, _From, State) ->        Value = proplists:get_value(Key,State),        {reply, Value, State};    handle_call(_Request, _From, State) ->        {reply,ok, State}.    handle_cast({set,Key, Value}, State) ->        NewState = [{Key, Value} | proplists:delete(Key,State)],        {noreply, NewState};    handle_cast(_Request, State) ->        {noreply, State}.    handle_info(Info,State) ->        io:format("other message:~p~n",[Info]),        {noreply, State}.

这份示例我使用了最少的 API,用了最简单的方式实现了上述 demo_server 同样的功能。

而通过 Gen_Server 实现,通用,标准、可靠。

首先我们看一下 Gen_Server 的使用方法:

    gen_server module            Callback module    -----------------            ---------------    gen_server:start    gen_server:start_link -----> Module:init/1    gen_server:stop       -----> Module:terminate/2    gen_server:call    gen_server:multi_call -----> Module:handle_call/3    gen_server:cast    gen_server:abcast     -----> Module:handle_cast/2    -                     -----> Module:handle_info/2    -                     -----> Module:terminate/2    -                     -----> Module:code_change/3 

实现一个 Gen_Server 的 Behavior 需要加上 -behaviour(gen_server). 标记,并实现 Gen_Server 的回调。

它的全部回调函数列表如下:

    [{init,1},     {handle_call,3},     {handle_cast,2},     {handle_info,2},     {terminate,2},     {code_change,3},     {format_status,2}]

其中可选回调列表如下:

    [{handle_info,2},     {terminate,2},     {code_change,3},     {format_status,2}]

可选回调可以省略,根据需要添加。

本次用的 API 有 start_link/4call/2cast/2stop/0

按照使用顺序进行讲解:

  • start_link/0 函数调用 gen_server:start_link(Name,Module,Args,Options) 创建带注册名字的 Gen_Server 进程实例。在启动时,会执行回调函数 init/1,并返回 {ok, State}
  • get/1 函数调用同步调用函数 gen_server:call(ServerRef,Request),触发回调函数 handle_call(Request, From,State) ,处理消息,并返回 {reply,Reply, NewState}Gen_Server 实例进程会把 Reply 作为 gen_server:call/2 的结果返回给调用者进程。
  • set/2 函数调用异步调用函数 gen_server:cast(ServerRef,Request),触发回调函数 handle_cast(Reqeust, State) 处理消息,并返回{noreply, NewState}。不会给调用者进程返回任何消息,而且调用者进程调用 gen_server:cast/2 的时候会直接返回 ok 给调用者进程。
  • stop/0 函数调用 gen_server:stop(ServerRef) 函数停止Gen_Server 进程,如果有定义回调 terminate/2 则先执行它进行收尾工作。

以上便是一个简单的 Gen_Server 用法说明。

3.2 源码分析

实际上,通过原始模型、使用方法的研究,我们已经分析出了源码阅读的思路了,基本上我们可以通过启动、loop 循环、异步调用、同步调用等几个环节来阅读代码。

经过阅读代码,涉及的模块主要有 Gen_ServerGenproc_lib 三个模块,额外辅助模块有 sys 模块。

3.2.1 启动流程

首先是启动流程图例:

Erlang 源码分析之 Gen_Server

我们以 gen_server:start_link(Name,Module,Args,Options) 为例查看代码。

我们以 Mod 模块为起始(这里就是我们自己的 mock_server 模块),调用 gen_sever:start_link(Name,Module,Args,Options) 函数,该函数最后返回 {ok,Pid} 为结束。 主要工作的模块为 Gen_ServerGenproc_lib

其中图例中黄色部分是调用者进程,从 spawn 开始的绿色部分都是新派生出来的 Gen_Server 进程实例。

和 spawn 并行的流程 sync_wait(Pid,Timeout) 是调用者进程在等待派生进程返回启动成功标志的流程,绿线部分即返回成功标志的消息。

图中派生进程初始化后会回调 Mod 模块的 init 函数。

派生进程最后的状态会停留在 loop 函数的 receive 命令处,等待调用者进程发来消息,处理完消息后继续尾递归 loop 循环。

3.2.2 loop 循环

以派生的 Gen_Server 实例进程的视角我们来看一下它的 loop 循环过程见图示:

Erlang 源码分析之 Gen_Server

从图示中我们看到,循环起始位置是 Gen_Server 实例进程阻塞在 receive 接收消息状态,它接收消息大体分为几类:

系统消息(system)、cast消息($gen_cast)、call 消息(gen_call)、一般消息(无 tag 消息)以及终止进程消息(stop 是正常终止,其他为异常终止)。

系统消息由 sys 模块处理,当接收到 suspend 消息时,循环进入 suspend_loop,此时只能处理系统消息,接收到 resume 消息时会恢复到正常 loop 循环。(此处逻辑未在图示中展开)

cast、call、info 分别对应的消息分别会回调 Mod:handle_castMod:handle_callMod:handle_info,并根据回调返回的结果 {noreply,NewState}| {reply,Reply,NewState} |{stop,Reason,Reply,NewState} | {stop,Reason,NewState} 进行处理:

  • noreply 直接重新进入 loop 循环;
  • reply 直接向调用者进程返回 relpy 消息,然后重新进入循环;
  • stop 则执行 terminate,终止进程
3.2.3 cast、call 流程

下面我们从代码执行流程看一下 gen_server:cast/2 的流程:

Erlang 源码分析之 Gen_Server

从代码流程看一下 gen_server:call/2 的流程:

Erlang 源码分析之 Gen_Server

上述两个图例绿色为调用者进程,黄色为派生的 Gen_Server 进程。

如果光从上面的 loop 图例以及代码图例中还不清晰,那我们就再从调试角度分析具体消息。

3.2.4 代码调试
    mock_server:start_link().    打印    {ok,<0.243.0>}    sys:statistics(mock_server,true).    打印    ok    sys:trace(mock_server,true).    打印    ok    mock_server:get(a).    打印    *DBG* mock_server got call {get,a} from <0.201.0>    *DBG* mock_server sent undefined to <0.201.0>, new state []    undefined    mock_server:set(a,b).    打印    *DBG* mock_server got cast {set,a,b}    *DBG* mock_server new state [{a,b}]    ok

现在我们不用封装好的函数,而是自己发送消息来做尝试

    mock_server ! {'$gen_cast',{set,a,c}}.    打印    *DBG* mock_server got cast {set,a,c}    {'$gen_cast',{set,a,c}}    *DBG* mock_server new state [{a,c}]    mock_server ! {'$gen_call', {self(), erlang:monitor(process, mock_server)}, {get, a}}.    打印    *DBG* mock_server got call {get,a} from <0.201.0>    *DBG* mock_server sent c to <0.201.0>, new state [{a,c}]    {'$gen_call',{<0.201.0>,                  #Ref<0.4032721631.3431727106.215954>},                 {get,a}}    flush().    Shell got {#Ref<0.4032721631.3431727106.215954>,c}    ok

通过模拟发 cast 消息,我们将 a 的 value 设置成 c 了,通过模拟发 call 消息,当我们刷新 shell 信箱时,收到了返回的 value c。

模拟系统消息

    sys:get_status(mock_server).    打印    {status,<0.243.0>,            {module,gen_server},            [[{'$initial_call',{mock_server,init,1}},              {'$ancestors',[<0.201.0>]}],             running,<0.201.0>,             [{trace,true},              {statistics,{{{2018,9,21},{14,36,25}},{reductions,39},6,0}}],             [{header,"Status for generic server mock_server"},              {data,[{"Status",running},                     {"Parent",<0.201.0>},                     {"Logged events",[]}]},              {data,[{"State",[{a,c}]}]}]]}    mock_server ! {system, {self(), erlang:monitor(process, mock_server)}, suspend}.    打印    {system,{<0.201.0>,#Ref<0.4032721631.3431727106.216072>},            suspend}    flush().    打印    Shell got {#Ref<0.4032721631.3431727106.216072>,ok}    ok

上面用原始消息模拟 suspend 原始系统消息,此时的系统状态会变为 suspended 状态,并且仅处理系统消息

    (suspend_loop循环)    sys:get_status(mock_server).    打印    {status,<0.243.0>,            {module,gen_server},            [[{'$initial_call',{mock_server,init,1}},              {'$ancestors',[<0.201.0>]}],             suspended,<0.201.0>,             [{trace,true},              {statistics,{{{2018,9,21},{14,36,25}},{reductions,39},6,0}}],             [{header,"Status for generic server mock_server"},              {data,[{"Status",suspended},                     {"Parent",<0.201.0>},                     {"Logged events",[]}]},              {data,[{"State",[{a,c}]}]}]]}

此时调用 cast 和 call 消息

    mock_server:set(a,d).    打印    ok    mock_server:get(a).    等待5秒钟,因为超时而崩溃    ** exception exit: {timeout,{gen_server,call,[mock_server,{get,a}]}}         in function  gen_server:call/2 (gen_server.erl, line 206)

suspend 命令是 OTP 系统命令一般是给监督树和代码热更使用的,所以要谨慎使用。

四、总结

无论 Gen_Server 实现的多么复杂,都脱不开前面提到的原始模型,通过阅读源码,我们发现它对消息加了不同标签进行区分,其中系统消息增加了很多通用的辅助功能。 初始化与同步、异步、销毁都变得标准并且可定制化。

最后,希望这篇文档对于对 Erlang 感兴趣的同学有些用处,感谢!


本文首发于GitChat,未经授权不得转载,转载需与GitChat联系。

阅读全文: http://gitbook.cn/gitchat/activity/5badd4c5f475834d2561ad35

一场场看太麻烦?订阅GitChat体验卡,畅享300场chat文章!更有****下载、****学院等超划算会员权益!点击查看