支持高吞吐工作流引擎系统的设计

一、概述

1 工作流

工作流可实现这样的需求:
即流程的实现要能够随着用户实际业务执行流程的改变而自动变化。

假设没有工作流,那么软件系统流程节点之间会相互依赖,这个流程就会定死,系统执行流程则无法随用户定义的流程的改变而改变,而用户每次流程变化时,都需要重新修改软件,代价较大。

2 工作流引擎

2.1 来自百度百科的定义

所谓工作流引擎是指workflow作为应用系统的一部分,并为之提供对各应用系统有决定作用的根据角色、分工和条件的不同决定信息传递路由、内容等级等核心解决方案。工作流引擎包括流程的节点管理、流向管理、流程样例管理等重要功能。

2.2 工作流引擎的通俗解释

工作流引擎是用来定义流程、驱动流程的软件,不负责业务。也就是一个业务完成后,下一个业务是谁(应该流转到哪里)有工作流流程来确定,并流转到下一个业务,并把相应的数据上下文传递过去。

二、工作流引擎系统的研究现状

目前,工作流引擎系统在企业应用中非常广泛,业界主流的工作流引擎系统包括JBPM3, JBPM4, JBPM5, Activiti5, SWF等等,其中应用最为广泛的是Activiti。但这些工作流引擎主要应用于企业办公等业务,例如流程审批,甚至包括人机交互等功能,业务逻辑复杂,从而会有大量对关系型数据库如MySQL的实时读写操作。但这样的系统对于要求高并发、高吞吐的互联网系统,Activiti等工作流系统的性能与吞吐量远远无法满足需求。

本文将论述如何设计与实现一个支持高并发、高吞吐、高可用、易扩展、可伸缩的工作流引擎系统。

三、工作流引擎的设计

1. 基础概念描述

1.1. 如何描述流程

工作流引擎的业务流转是基于工作流流程图来描述流程的,流程图需要有自己的格式、语义,通常为xml格式,这里主流的工作流引擎流程定义通常参考了WSMC的标准xpdl。

工作流描述一个流程,*元素包含下面三元素:
元素1. 活动:即流程图中的方块;
元素2. 转移:即流程图中的箭头;
元素3. 流程:一组活动和转移的集合。

次级元素包括:
元素1. 参与者
元素2. 转移条件(排他、并行、包容)
元素3. 需要的数据

1.2. 如何设计流程图

这里可沿用Activiti中使用的工作流流程图,可通过在Eclipse IDE中安装BPMN插件来绘制可视化的工作流流程图,该流程图包含了上述的工作流流程描述,该流程图实际为xml格式的文件。

1.3. 如何存储和装载解析流程图

1.3.1 流程图的存储

可把xml文件内容直接存储在MySQL中,用redis作为缓存。

1.3.2 装载流程定义

工作流引擎会把流程图定义装载到引擎所在的内存中,即把流程图文件的内容对象化。

1.4. 流程定义和流程实例

对象化的流程图,即为流程定义,流程定义保存在内存中。当服务端收到某个工作流请求时,根据流程定义来生成流程实例来处理该请求,这样,每个请求都有自己的流程实例,可做到不同的请求的流程实例相互独立。
流程定义和流程实例的关系,类似于面向对象中类与类的实例的关系。

2. 工作流引擎的架构设计

2.1 工作流引擎系统的架构设计图如下:

支持高吞吐工作流引擎系统的设计

2.2 架构设计说明

工作流引擎系统包含如下子模块
流程编排系统、Web管理系统、文件上传系统、业务系统、网关代理、消息系统、数据库系统、工作流引擎应用系统。下面依次做以说明:

  1. 流程编排系统
    通过在Eclipse IDE安装BPMN插件后,用来绘制工作流流程图。

  2. Web管理系统
    对工作流引擎系统提供web界面,通过调用工作流引擎提供的API接口来做服务查询、服务管理、配置管理、历史数据查询等等。

  3. 文件上传系统
    用于上传流程定义文件到工作流引擎系统。

  4. 业务系统
    用户处理工作流引擎中每个userTask原子任务的系统。

  5. 网关代理
    网关代理用于工作流引擎系统请求业务系统的代理。

  6. 消息系统
    消息系统使用kafka, 用于业务系统向工作流引擎发送请求及回调的消息队列中间件,目的是解耦、流量消峰等。

  7. 数据库系统
    数据库系统包含MySQL、redis、ElasticSearch。
    其中,MySQL用户存储流程定义文件的内容;redis用于请求请求上下文及监控数据、统计数据等中间结果;ElasticSearch用于存储业务日志用于用户行为分析及问题追踪。

  8. 工作流引擎系统
    工作流引擎系统是所有子模块的核心,工作流引擎系统分为流程引擎子系统、任务引擎子系统、监控管理子系统。
    而流程引擎系统包含流程启动、接收回调、流程部署几个子流程。任务引擎系统包括任务存储与任务调度两个子功能。监控管理系统包括流程管理、流程监控、任务管理、任务监控、配置管理等功能。

对于上述的描述中,核心流程为工作流初始请求流程与工作流引擎异步回调处理流程,下面依次详细描述其过程。

2.2.1 工作流引擎初始请求流程

· 消费kafka msg
· 从redis中获取流程定义的最新版本
· 根据流程定义的json字符串,生成新的流程实例对象,保存到内存缓存。
· 对内存中的流程实例对象做深拷贝,生成新的流程实例对象拱当前请求线程使用。
· 保存流程实例的开始时间到Redis:
o key: 流程定义id_流程实例id
o value: startTime时间戳
· 把当前的UserTask的状态设置为completed
· 设置当前流程实例的状态为running
· run next user task
o 逐个节点解析流程图,找到根据sequenceflow, 找到下一个可执行的userTask:
o 设置user task的状态为running
o 请求网关:
§ 请求前先保存user task请求上下文:
§ 调用user task原子任务
o 如果是异步任务:
§ 保存请求上下文到redis供回调时使用:jobId–>requestContext
o 如果是同步任务:
§ 获取网关的返回结果json,生成ResponseContext
· 保存ResponseContext到redis作为历史记录。
· 保存该user task的请求执行时间到redis,并投递到kafka (备注:如果最后不写到MySQL, 则没有必要写入到redis)
§ 如果网关回调是A00000:
· 当前流程实例的状态是running, 则去递归地调用runNextUserTask; 直到执行到EndEvent节点,则保存流程实例的执行时间,且清理redis临时数据,并保存临时数据到MySQL。然后该流程实例不会继续往后走。
· 如果流程实例的状态是abort, 则不再继续执行,且清理该流程实例的临时redis数据。
§ 如果网关回调不是A00000:
· 清理redis历史数据
· 统计UserTask的执行时间及结果
· 统计该流程实例的执行时间及结果
· 强制终止该流程实例的执行

2.2.2 异步结果回调过程

· kafka监听从metadata回调的所有topic, 消费回调消息json
· 解析kafka消息的json:
o 如果code为A00000:
§ 根据jobId获取到preRequestContext
§ 根据回调和preRequestContext, 生成ResponseContext
§ 通过线程池执行下面的业务流程:
· 把前一个user task的状态置为complete
· 生成一个新的流程实例对象
· 保存前一个user task回调结果ResponseContext到redis
· 保存前一个user task完成时间
· 从redis中查询当前流程实例的状态:
o 如果是running, 则递归调用runNextUserTask
o 如果是abort, 则清理redis中的临时数据
o 如果code不是A00000:
§ 清理redis历史数据
§ 统计UserTask的执行时间及结果
§ 统计该流程实例的执行时间及结果
强制终止该流程实例的执行

2.3 工作流引擎实体设计

工作流引擎的流程图对象化后,会被抽象成下面的类。
1)Process:流程类,表示整个流程定义,实例类中包含流程id, 流程实例id, 当前正在执行的userTask, 当前流程实例的版本号,活动id到活动类的映射,无条件转移id到转移类的映射,有条件转移id到转移类的映射。
2)Activity: 活动类,包含id和name属性。
3)StartEvent:Activity的子类,作为工作流起始节点。
4)EndEvent:Activity的子类,作为工作流结束节点。
5)ServiceTask:Activity的子类,用于做网关的特殊条件判断。
6)UserTask:Activity的子类,用于标识业务系统中的原子任务,包含assignee和category属性。assignee标识业务系统的请求地址,category标识业务系统的类型,如同步或异步。
7)Gateway:Activity的子类,标识网关类型。网关分三类:排他网关、并行网关和包容网关。
8)ExclusiveGateway:Gateway的子类。排他网关,按顺序判断多个条件分支,只要一个条件分支满足,则走该分支,其他分支放弃执行。
9)InclusiveGateway:Gateway的子类。包容网关,判断每个指向网关的条件分支,然后执行所有满足条件的分支,最后将结果汇总后再往后继续执行。
10)ParallelGatway:Gateway的子类。并行网关,多个条件分支同时执行,最后将结果汇总后再往后继续执行。并行网关相当于所有条件分支都满足的包容网关。
11)SequenceFlow:即工作流*元素中的“转移”,对应流程图中的箭头,也称为分支。包含id、sourceRef(前驱节点的id)、targetRef(后继节点的id)
12)ConditionSequenceFlow:SequenceFlow的子类,用于指向带条件的网关如排他网关和包容网关。包含conditionExpression条件,用于表示分支的条件判断,这里可采用标准的juel来做条件判断。

上述定义可用UML类图做如下表示:
支持高吞吐工作流引擎系统的设计
2.4 工作流引擎对业务系统的请求上下文设计

请求上下文,用来记录请求业务系统的原子任务时的参数及回调结果。
TaskContext作为RequestContext和ResponseContext的父类,包含的属性是发送请求和回调结果共有的,如data(请求参数或回调结果),currentUserTaskId表示正在执行流程实例中的哪个userTask,processDefId(流程定义id),processDefVersion(流程定义版本),processInstanceId(流程实例id), updateTime(当前时间)等等。
RequestContext是TaskContext的子类,用于表示请求业务系统原子任务的参数,里面包含了若干业务请求用到的参数,如serviceName, serviceTag等等。
ResponseContext是TaskContext的子类,用于表示业务系统原子任务请求的回调结果,如返回码code等等业务相关的参数。

请求上下文的UML类图表示如下图:
支持高吞吐工作流引擎系统的设计

2.5 流程定义的更新设计

流程定义一共冗余存储了3份,分别是内存缓存、redis缓存、MySQL存储。

当用户上传流程图时,系统先把流程定义文件的内容保存到MySQL中,并利用MySQL自增ID的方式生成改流程图对应的版本号。
然后把流程id, MySQL中的版本号,流程定义文件内容保存到redis。在保存流程定义id到最新版本的映射到redis。

当工作流第一次请求时,先根据流程定义ID去内存缓存中查询对应的流程定义,如果没有,则去redis中查询到流程定义内容,然后根据流程定义的内容生成流程定义对象,把该对象存到内存缓存中。
假设内存缓存中已经有了流程定义,那么从流程定义中获取到流程定义的版本,然后根据流程定义id去redis中查询最新流程定义版本号,看二者是否相等,不相等的话,根据最新版本号从redis中获取流程定义内容,更新内存缓存中的内存。

当工作流的业务系统的原子任务回调结果给工作流时,此时可根据请求原子任务时使用流程定义版本号为准,如果内存缓存中的版本号已更新(已大于请求原子任务时使用流程定义版本号),此时要从redis中获取与该版本号向匹配的流程定义内容,然后生成相应的流程实例对象。

2.6 关于流程流转过程的设计

对于一个工作流请求,首先根据流程定义ID获取到工作流流程定义后,把流程定义对象化,生成流程对象。
public class Process implements Serializable,Cloneable {
public String id; // process定义的唯一id
public String instanceId; // process实例的唯一id, 这里为请求的jobId
public String currentUserTaskId;
public Integer version; // 版本号
// id --> Activity
public Map<String, Activity> activityMap = new HashMap<String, Activity>();
// ActivityId --> SequenceFlow: 非网关节点,只有一个后继sequence flow
public Map<String, SequenceFlow> taskTransitionMap = new HashMap<String, SequenceFlow>();
// ActivityId --> SequenceFlows: 网关节点,可能有多个后继sequence flow
public Map<String, List> gatewayTransitionMap = new HashMap<String, List>();
// conditionGatewayActivityId --> PreSequenceflowIds
public Map<String, Set> condGatewaySequenceFlowMap = new HashMap<String, Set>();
}

默认的currentUserTaskId为StartEvent活动节点,然后从taskTransitionMap中获取下一个SequenceFlow,再用SequenceFlow.targetRef获取其下一个活动节点,如果下一个节点是网关,则根据网关类型,网关类型有并行网关、排他网关、包容网关,分别做不同的业务逻辑处理。网关执行完毕,同理找到下一个SequenceFlow, 再找到下一个活动节点,如果活动节点是UserTask(UserTask用于请求业务系统的原子任务),则根据assignee中的参数对业务系统进行访问,根据category等参数来判定该业务节点的处理结果是同步返回还是异步返回。若同步返回,则获取返回结果后,继续流转工作流流程;若异步返回(如经由kafka),则走上面描述的“异步结果回调过程”回调流程。当执行到EndEvent活动节点时,整个工作流请求结束。

2.7 工作流引擎服务的部署设计

为了方便工作流引擎服务良好的可伸缩性,这里采用了无状态服务。即工作流的初始请求和业务系统给的原子任务回调,都经由kafka回调给工作流引擎,工作流引擎作为kafka的消费者,不同工作流引擎服务的kafka consumer采用相同的consumer group id,这样,对工作流的请求,只有一台工作流引擎服务器会消费到,从而做到无状态,而请求的上下文都保存在redis中,通过流程实例id去redis获取请求上下文,来继续工作流引擎的下一个节点的处理。

当升级服务时,为了保证不中断业务请求,可通过配置管理的开关来暂停对kafka的消费,待服务升级完毕后,再启动kafka consumer。

2.8 工作流引擎的监控设计

工作流引擎的请求上下文都保存在redis中,每次对业务服务节点的请求前,可记录userTask的状态到redis, 当回调后可更新task的状态到redis;同理,对于流程实例id的状态监控也可以用相同的方式来实现。

2.9 工作流引擎系统的吞吐量保证及性能瓶颈

工作流引擎的所有处理,除了内存操作,就是对redis的操作,个别对于MySQL, ElasticSearch的操作,都可以通过队列异步处理。所以,唯一的性能瓶颈在于对redis读写的性能,这里推荐服务器与redis可部署在同机房,最大程度降低redis内网通信的性能损耗。

3.0 工作流引擎系统的压测

按qps=1000来进行压测,结果如下。

支持高吞吐工作流引擎系统的设计
可以看出,qps为1000时工作流引擎服务无压力。

四、总结

该工作流引擎设计保证高吞吐的关键在于,所有的请求线程除了内网同机房访问redis外,没有实时的对中间件访问或网络IO,而内存的查询操作,基本都是通过Map, 其时间复杂度为O(1)。

如果redis服务器与工作流引擎服务器无法部署在同机房,或即使同机房吞吐量仍无法满足需求,则可考虑通过有状态服务,采用内存操作来替代redis,但缺点是当扩容或缩容时,需要对相应的状态数据给出迁移方案。

借助redis来传递请求上下文的无状态工作流引擎服务的设计,重依赖了redis服务,所以需要注意当redis服务故障或网络故障时,怎样保证工作流引擎仍服务可用,这里推荐的方案是通过不同机房的redis和工作流引擎集群来做backup, 例如当北京电信机房的redis故障时,则可将集群切换到济阳联通机房(包括工作流引擎服务和redis),当然前提时具备多机房部署的环境。