phxpaxos 状态机
引用的话:
状态机这个名词大家都不陌生,一个状态机必然涉及到一个状态转移,而Paxos的每个实例,就是状态转移的输入,由于每台机器的实例编号都是连续有序增长的,而每个实例确定的值是一样的,那么可以保证的是,各台机器的状态机输入是完全一致的。根据状态机的理论,只要初始状态一致,输入一致,那么引出的最终状态也是一致的。而这个状态,是有无限的想象空间,你可以用来实现非常多的东西。
phpaxos状态机实现的类图:
其中包含了五个类:
SMFac:状态机管理类,维护这一个状态机列表,对外提供添加状态机,获取状态机列表,状态机执行等访问接口。
StateMachine:状态机的抽象基类,业务人员可以定制自己的状态机,并且加入到状态机管理类中。
InsideSM:继承了StateMachine,抽象出一个内部状态机的基类。
MasterStateMachine:内部状态机,选举master操作。
SystemVM:内部状态机,处理集群节点变更。
1、SMFac管理类:
由于每个group之间是相对独立的,所以我们一般以每个group的逻辑来分析。每个group维护这一个SMFac,允许存在多个状态机,不同的状态机之间的数据相互隔离,多个状态机在一个group串行的excute,具体的是一次proposer带一个状态机,learn处理完成之后执行状态机的excute函数(不考虑batch proposer的情况),因为他们共享相同的资源:Proposer,learner,accepter。 SMFac做为管理类,除支持添加各种状态机,还对外提供了统一的状态机执行接口。
在learn处理消息完成之后,会进行一些后续状态机的处理等操作:
if (m_oLearner.IsLearned())
{
BP->GetInstanceBP()->OnInstanceLearned();
SMCtx * poSMCtx = nullptr;
bool bIsMyCommit = m_oCommitCtx.IsMyCommit(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), poSMCtx);
if (!bIsMyCommit)
{
BP->GetInstanceBP()->OnInstanceLearnedNotMyCommit();
PLGDebug("this value is not my commit");
}
else
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetInstanceBP()->OnInstanceLearnedIsMyCommit(iUseTimeMs);
PLGHead("My commit ok, usetime %dms", iUseTimeMs);
}
// 执行状态机
if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx))
{
BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();
PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());
m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail,
m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
m_oProposer.CancelSkipPrepare();
return -1;
}
{
//this paxos instance end, tell proposal done
m_oCommitCtx.SetResult(PaxosTryCommitRet_OK
, m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());
if (m_iCommitTimerID > 0)
{
m_oIOLoop.RemoveTimer(m_iCommitTimerID);
}
}
PLGHead("[Learned] New paxos starting, Now.Proposer.InstanceID %lu "
"Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
PLGHead("[Learned] Checksum change, last checksum %u new checksum %u",
m_iLastChecksum, m_oLearner.GetNewChecksum());
m_iLastChecksum = m_oLearner.GetNewChecksum();
NewInstance();
PLGHead("[Learned] New paxos instance has started, Now.Proposer.InstanceID %lu "
"Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
m_oCheckpointMgr.SetMaxChosenInstanceID(m_oAcceptor.GetInstanceID());
BP->GetInstanceBP()->NewInstance();
}
其他的先不关心,主要执行了SMExecute方法,源码如下:
bool Instance :: SMExecute(
const uint64_t llInstanceID,
const std::string & sValue,
const bool bIsMyCommit,
SMCtx * poSMCtx)
{
return m_oSMFac.Execute(m_poConfig->GetMyGroupIdx(), llInstanceID, sValue, poSMCtx);
}
主要封装了m_oSMFac的Execute方法:
bool SMFac :: Execute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sPaxosValue, SMCtx * poSMCtx)
{
if (sPaxosValue.size() < sizeof(int))
{
PLG1Err("Value wrong, instanceid %lu size %zu", llInstanceID, sPaxosValue.size());
//need do nothing, just skip
return true;
}
int iSMID = 0;
memcpy(&iSMID, sPaxosValue.data(), sizeof(int));
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
std::string sBodyValue = string(sPaxosValue.data() + sizeof(int), sPaxosValue.size() - sizeof(int));
if (iSMID == BATCH_PROPOSE_SMID)
{
BatchSMCtx * poBatchSMCtx = nullptr;
if (poSMCtx != nullptr && poSMCtx->m_pCtx != nullptr)
{
poBatchSMCtx = (BatchSMCtx *)poSMCtx->m_pCtx;
}
//这里其实是存在一个batch proposer,暂时不讨论
return BatchExecute(iGroupIdx, llInstanceID, sBodyValue, poBatchSMCtx);
}
else
{
return DoExecute(iGroupIdx, llInstanceID, sBodyValue, iSMID, poSMCtx);
}
}
关注一下DoExecute算法,其实就是轮询状态机列表,找到合适的状态机,调用状态机的执行函数:
bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
if (iSMID == 0)
{
PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
return true;
}
if (m_vecSMList.size() == 0)
{
PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
return false;
}
for (auto & poSM : m_vecSMList)
{
if (poSM->SMID() == iSMID)
{
return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
}
}
PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
return false;
}
class StateMachine
{
public:
virtual ~StateMachine() {}
//获取状态机的标识符
virtual const int SMID() const = 0;
//执行状态机
virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;
//真正发起Propose之前,调用状态机中该函数,修改请求数据或做其他处理
virtual void BeforePropose(const int iGroupIdx, std::string & sValue);
//是否需要调用BeforePropose,默认为false
virtual const bool NeedCallBeforePropose();
//Checkpoint机制相关函数
virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue);
virtual const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;
virtual int LockCheckpointState();
virtual int GetCheckpointState(const int iGroupIdx, std::string & sDirPath,
std::vector<std::string> & vecFileList);
virtual void UnLockCheckpointState();
virtual int LoadCheckpointState(const int iGroupIdx, const std::string & sCheckpointTmpFileDirPath,
const std::vector<std::string> & vecFileList, const uint64_t llCheckpointInstanceID);
};
状态机的使用方法其实很简单,定义一个状态机后,这是一个SMID(每个状态机都要不一样),propose的时候指定状态机ID即可:
//状态机上下文,包括SMID,和用户自定义的上下文的数据m_pCtx
class SMCtx
{
public:
SMCtx();
SMCtx(const int iSMID, void * pCtx);
int m_iSMID;
void * m_pCtx;
};
class Node
{
public:
Node() { }
virtual ~Node() { }
//If you want to end paxos, just delete poNode.
//But if you use your own network, poNode can be deleted after your own network stop recieving messages.
static int RunNode(const Options & oOptions, Node *& poNode);
//Base function.
virtual int Propose(const int iGroupIdx, const std::string & sValue, uint64_t & llInstanceID) = 0;
// 最后一个参数制定状态机即可。
virtual int Propose(const int iGroupIdx, const std::string & sValue, uint64_t & llInstanceID, SMCtx * poSMCtx) = 0;
...
}