超级账本hyperledger fabric第六集:账本存储及源码阅读


  • peer节点做账本存储
  • orderer是临时存储区块,peer节点是账本存储的持久化,会改变世界状态

  • 文件系统,区块是存储为文件
  • 区块索引,用于查询区块,是用levleDB实现的
  • 状态数据库,一般存放区块链最新状态,数据不需要HA,可以从文件系统再次获取,couchDB支持模糊查询


  • 回忆交易流程
    1. 交易模拟:在背书节点执行模拟时,最终返回交易读写集(RWset),告诉区块链在交易中读写了哪些数据
    2. 交易排序
    3. 交易验证,交易验证后,更新世界状态,更新的就是读写集中的写集
  • 交易读写集的3个概念
    1. 读集:包含键的列表,键的提交版本,读取对应的值,返回的是已提交的状态的值(读已提交),不能读取交易过程中写入的数据
    2. 写集:包含键的列表,写入的数据的值,如果多次写入,以最后一次为准
    3. 版本号:用区块高度和交易编号组成的
  • 交易验证阶段是对读写集进行验证(验证读集)
    1. 验证读集的版本号是否等于世界状态的版本号


  • 世界状态
    1. 交易执行后,所有键的最新值
  • 历史数据索引(可选)
  • 区块存储
    1. 按照文件去存  blocfile_xxxxxx
    2. 文件大小是64M,若修改,需要重新编译peer源码
    3. 账本最大容量64M*
  • 区块读取
    1. 区块文件流
    2. 区块流
    3. 迭代器
  • 区块索引
    1. 键:区块高度、区块哈希、交易哈希
    2. 值:区块文件编号、文件内的偏移量、区块数据的长度
  • 区块提交
    1. 保存到文件


  • 从4方面看,读写集、状态数据、历史数据、区块文件
  • 可以先从core/ledger下的ledger_interface.go中看大体结构

package ledger

import (
	commonledger "github.com/hyperledger/fabric/common/ledger"

type PeerLedgerProvider interface {
	Create(genesisBlock *common.Block) (PeerLedger, error)
	Open(ledgerID string) (PeerLedger, error)
	Exists(ledgerID string) (bool, error)
	List() ([]string, error)

type PeerLedger interface {
	GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
	GetBlockByHash(blockHash []byte) (*common.Block, error)
	GetBlockByTxID(txID string) (*common.Block, error)
	GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
	NewTxSimulator() (TxSimulator, error)
	NewQueryExecutor() (QueryExecutor, error)
	NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
	Prune(policy commonledger.PrunePolicy) error

type ValidatedLedger interface {

type QueryExecutor interface {
	GetState(namespace string, key string) ([]byte, error)
	GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
	GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
	ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)

type HistoryQueryExecutor interface {
	GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error)

type TxSimulator interface {
	SetState(namespace string, key string, value []byte) error
	DeleteState(namespace string, key string) error
	SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
	ExecuteUpdate(query string) error
	GetTxSimulationResults() ([]byte, error)
  • 读写集,分为交易读写集生成和交易读写集验证两个部分去看


type lockBasedTxSimulator struct {
	rwsetBuilder *rwsetutil.RWSetBuilder
  •  点进RWSetBuilder看读写集定义:
type RWSetBuilder struct {
	rwMap map[string]*nsRWs
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
	txRWSet := &TxRwSet{}
	sortedNamespaces := util.GetSortedKeys(rws.rwMap)
	for _, ns := range sortedNamespaces {
		nsReadWriteMap := rws.rwMap[ns]


func NewValidator(db statedb.VersionedDB) *Validator {
	return &Validator{db}

func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
	respPayload, err := putils.GetActionFromEnvelope(envBytes)
	if err != nil {
		return nil, peer.TxValidationCode_NIL_TXACTION, nil

	txRWSet := &rwsetutil.TxRwSet{}

	if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
		return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
	for _, nsRWSet := range txRWSet.NsRwSets {
		ns := nsRWSet.NameSpace
		if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
			if err != nil {
				return peer.TxValidationCode(-1), err
			return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
		if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
			if err != nil {
				return peer.TxValidationCode(-1), err
			return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
	return peer.TxValidationCode_VALID, nil

func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
	for _, kvRead := range kvReads {
		if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
			return valid, err
	return true, nil

func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
	//(k1,1,v1) (k2 ,2,v2)
	//Write(k1,2,v1') Read(k1)
	if updates.Exists(ns, kvRead.Key) {
		return false, nil
	versionedValue, err := v.db.GetState(ns, kvRead.Key)
	if err != nil {
		return false, err
	var committedVersion *version.Height
	if versionedValue != nil {
		committedVersion = versionedValue.Version
	if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
		logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
			ns, kvRead.Key, committedVersion, kvRead.Version)
		return false, nil
	return true, nil
  • 状态数据库:看levelDB的实现
    1. fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go

var logger = flogging.MustGetLogger("stateleveldb")

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

type VersionedDBProvider struct {
	dbProvider *leveldbhelper.Provider
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
	compositeKey := constructCompositeKey(namespace, key)
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	if dbVal == nil {
		return nil, nil
	val, ver := statedb.DecodeValue(dbVal)
	return &statedb.VersionedValue{Value: val, Version: ver}, nil

func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		vals[i] = val
	return vals, nil

func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil

func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
	return nil, errors.New("ExecuteQuery not supported for leveldb")

func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
	namespaces := batch.GetUpdatedNamespaces()
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
			logger.Debugf("Channel [%s]: Applying key=[%#v]", vdb.dbName, compositeKey)
			if vv.Value == nil {
			} else {
				dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
	dbBatch.Put(savePointKey, height.ToBytes())
	if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
		return err
	return nil

func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
	versionBytes, err := vdb.db.Get(savePointKey)
	if err != nil {
		return nil, err
	if versionBytes == nil {
		return nil, nil
	version, _ := version.NewHeightFromBytes(versionBytes)
	return version, nil

func constructCompositeKey(ns string, key string) []byte {
	return append(append([]byte(ns), compositeKeySep...), []byte(key)...)

func splitCompositeKey(compositeKey []byte) (string, string) {
	split := bytes.SplitN(compositeKey, compositeKeySep, 2)
	return string(split[0]), string(split[1])
  • 历史数据库:看levelDB的实现
    1. fabric/core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go
    • 标识某个key被某个交易改变了,看Commit()方法
    • 怎么查询一个key的变动历史
func (historyDB *historyDB) Commit(block *common.Block) error {
for _, nsRWSet := range txRWSet.NsRwSets {
				ns := nsRWSet.NameSpace
				for _, kvWrite := range nsRWSet.KvRwSet.Writes {
					writeKey := kvWrite.Key
					//composite key for history records is in the form ns~key~blockNo~tranNo
					compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

					// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
					dbBatch.Put(compositeHistoryKey, emptyValue)
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte {

	var compositeKey []byte
	compositeKey = append(compositeKey, []byte(ns)...)
	compositeKey = append(compositeKey, compositeKeySep...)
	compositeKey = append(compositeKey, []byte(key)...)
	compositeKey = append(compositeKey, compositeKeySep...)
	compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(blocknum)...)
	compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(trannum)...)

	return compositeKey
  • 区块文件读取


var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {


  • 首先看了账本存储的根目录的接口,从宏观上把握
  • 交易读写集的生成和验证
  • 数据库增删改查
  • 还有一个没有实现的方法,数据裁剪的接口,可能去解决数据无限增长的处理