phxpaxos 状态机

引用的话:

        状态机这个名词大家都不陌生,一个状态机必然涉及到一个状态转移,而Paxos的每个实例,就是状态转移的输入,由于每台机器的实例编号都是连续有序增长的,而每个实例确定的值是一样的,那么可以保证的是,各台机器的状态机输入是完全一致的。根据状态机的理论,只要初始状态一致,输入一致,那么引出的最终状态也是一致的。而这个状态,是有无限的想象空间,你可以用来实现非常多的东西。

phpaxos状态机实现的类图:
phxpaxos 状态机

其中包含了五个类:
SMFac:状态机管理类,维护这一个状态机列表,对外提供添加状态机,获取状态机列表,状态机执行等访问接口。
StateMachine:状态机的抽象基类,业务人员可以定制自己的状态机,并且加入到状态机管理类中。
InsideSM:继承了StateMachine,抽象出一个内部状态机的基类。
MasterStateMachine:内部状态机,选举master操作。
SystemVM:内部状态机,处理集群节点变更。


1、SMFac管理类:
由于每个group之间是相对独立的,所以我们一般以每个group的逻辑来分析。每个group维护这一个SMFac,允许存在多个状态机,不同的状态机之间的数据相互隔离,多个状态机在一个group串行的excute,具体的是一次proposer带一个状态机,learn处理完成之后执行状态机的excute函数(不考虑batch proposer的情况),因为他们共享相同的资源:Proposer,learner,accepter。 SMFac做为管理类,除支持添加各种状态机,还对外提供了统一的状态机执行接口。

在learn处理消息完成之后,会进行一些后续状态机的处理等操作:
    if (m_oLearner.IsLearned())
    {
        BP->GetInstanceBP()->OnInstanceLearned();

        SMCtx * poSMCtx = nullptr;
        bool bIsMyCommit = m_oCommitCtx.IsMyCommit(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), poSMCtx);

        if (!bIsMyCommit)
        {
            BP->GetInstanceBP()->OnInstanceLearnedNotMyCommit();
            PLGDebug("this value is not my commit");
        }
        else
        {
            int iUseTimeMs = m_oTimeStat.Point();
            BP->GetInstanceBP()->OnInstanceLearnedIsMyCommit(iUseTimeMs);
            PLGHead("My commit ok, usetime %dms", iUseTimeMs);
        }
        // 执行状态机
        if (!SMExecute(m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue(), bIsMyCommit, poSMCtx))
        {
            BP->GetInstanceBP()->OnInstanceLearnedSMExecuteFail();

            PLGErr("SMExecute fail, instanceid %lu, not increase instanceid", m_oLearner.GetInstanceID());
            m_oCommitCtx.SetResult(PaxosTryCommitRet_ExecuteFail,
                    m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());

            m_oProposer.CancelSkipPrepare();

            return -1;
        }
        
        {
            //this paxos instance end, tell proposal done
            m_oCommitCtx.SetResult(PaxosTryCommitRet_OK
                    , m_oLearner.GetInstanceID(), m_oLearner.GetLearnValue());

            if (m_iCommitTimerID > 0)
            {
                m_oIOLoop.RemoveTimer(m_iCommitTimerID);
            }
        }
        
        PLGHead("[Learned] New paxos starting, Now.Proposer.InstanceID %lu "
                "Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
                m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());
        
        PLGHead("[Learned] Checksum change, last checksum %u new checksum %u",
                m_iLastChecksum, m_oLearner.GetNewChecksum());

        m_iLastChecksum = m_oLearner.GetNewChecksum();

        NewInstance();

        PLGHead("[Learned] New paxos instance has started, Now.Proposer.InstanceID %lu "
                "Now.Acceptor.InstanceID %lu Now.Learner.InstanceID %lu",
                m_oProposer.GetInstanceID(), m_oAcceptor.GetInstanceID(), m_oLearner.GetInstanceID());

        m_oCheckpointMgr.SetMaxChosenInstanceID(m_oAcceptor.GetInstanceID());

        BP->GetInstanceBP()->NewInstance();
    }

其他的先不关心,主要执行了SMExecute方法,源码如下:


bool Instance :: SMExecute(
        const uint64_t llInstanceID,
        const std::string & sValue,
        const bool bIsMyCommit,
        SMCtx * poSMCtx)
{
    return m_oSMFac.Execute(m_poConfig->GetMyGroupIdx(), llInstanceID, sValue, poSMCtx);
}

主要封装了m_oSMFac的Execute方法:

bool SMFac :: Execute(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sPaxosValue, SMCtx * poSMCtx)
{
    if (sPaxosValue.size() < sizeof(int))
    {
        PLG1Err("Value wrong, instanceid %lu size %zu", llInstanceID, sPaxosValue.size());
        //need do nothing, just skip
        return true;
    }

    int iSMID = 0;
    memcpy(&iSMID, sPaxosValue.data(), sizeof(int));

    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    std::string sBodyValue = string(sPaxosValue.data() + sizeof(int), sPaxosValue.size() - sizeof(int));
    if (iSMID == BATCH_PROPOSE_SMID)
    {
        BatchSMCtx * poBatchSMCtx = nullptr;
        if (poSMCtx != nullptr && poSMCtx->m_pCtx != nullptr)
        {
            poBatchSMCtx = (BatchSMCtx *)poSMCtx->m_pCtx;
        }
        //这里其实是存在一个batch proposer,暂时不讨论
        return BatchExecute(iGroupIdx, llInstanceID, sBodyValue, poBatchSMCtx);
    }
    else
    {
        return DoExecute(iGroupIdx, llInstanceID, sBodyValue, iSMID, poSMCtx);
    }
}

关注一下DoExecute算法,其实就是轮询状态机列表,找到合适的状态机,调用状态机的执行函数:

bool SMFac :: DoExecute(const int iGroupIdx, const uint64_t llInstanceID,
        const std::string & sBodyValue, const int iSMID, SMCtx * poSMCtx)
{
    if (iSMID == 0)
    {
        PLG1Imp("Value no need to do sm, just skip, instanceid %lu", llInstanceID);
        return true;
    }

    if (m_vecSMList.size() == 0)
    {
        PLG1Imp("No any sm, need wait sm, instanceid %lu", llInstanceID);
        return false;
    }

    for (auto & poSM : m_vecSMList)
    {
        if (poSM->SMID() == iSMID)
        {
            return poSM->Execute(iGroupIdx, llInstanceID, sBodyValue, poSMCtx);
        }
    }

    PLG1Err("Unknown smid %d instanceid %lu", iSMID, llInstanceID);
    return false;
}


class StateMachine
{
public:
    virtual ~StateMachine() {}

    //获取状态机的标识符    
    virtual const int SMID() const = 0;

    //执行状态机
    virtual bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
            const std::string & sPaxosValue, SMCtx * poSMCtx) = 0;

        //真正发起Propose之前,调用状态机中该函数,修改请求数据或做其他处理
        virtual void BeforePropose(const int iGroupIdx, std::string & sValue);

        //是否需要调用BeforePropose,默认为false
        virtual const bool NeedCallBeforePropose();

    //Checkpoint机制相关函数
    virtual bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
            const std::string & sPaxosValue);
    virtual const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const;
    virtual int LockCheckpointState();
    virtual int GetCheckpointState(const int iGroupIdx, std::string & sDirPath,
            std::vector<std::string> & vecFileList);
    virtual void UnLockCheckpointState();
    virtual int LoadCheckpointState(const int iGroupIdx, const std::string & sCheckpointTmpFileDirPath,
            const std::vector<std::string> & vecFileList, const uint64_t llCheckpointInstanceID);

};


状态机的使用方法其实很简单,定义一个状态机后,这是一个SMID(每个状态机都要不一样),propose的时候指定状态机ID即可:
//状态机上下文,包括SMID,和用户自定义的上下文的数据m_pCtx
class SMCtx
{
public:
    SMCtx();
    SMCtx(const int iSMID, void * pCtx);

    int m_iSMID;
    void * m_pCtx;
};

class Node
{
public:
    Node() { }
    virtual ~Node() { }

    //If you want to end paxos, just delete poNode.
    //But if you use your own network, poNode can be deleted after your own network stop recieving messages.
    static int RunNode(const Options & oOptions, Node *& poNode);

    //Base function.
    virtual int Propose(const int iGroupIdx, const std::string & sValue, uint64_t & llInstanceID) = 0;
    // 最后一个参数制定状态机即可。
    virtual int Propose(const int iGroupIdx, const std::string & sValue, uint64_t & llInstanceID, SMCtx * poSMCtx) = 0;
    ...
}