超级账本hyperledger fabric第六集:账本存储及源码阅读
账本存储概念:
- peer节点做账本存储
- orderer是临时存储区块,peer节点是账本存储的持久化,会改变世界状态
- 文件系统,区块是存储为文件
- 区块索引,用于查询区块,是用levleDB实现的
- 状态数据库,一般存放区块链最新状态,数据不需要HA,可以从文件系统再次获取,couchDB支持模糊查询
交易读写集:
- 回忆交易流程
- 交易模拟:在背书节点执行模拟时,最终返回交易读写集(RWset),告诉区块链在交易中读写了哪些数据
- 交易排序
- 交易验证,交易验证后,更新世界状态,更新的就是读写集中的写集
- 交易读写集的3个概念
- 读集:包含键的列表,键的提交版本,读取对应的值,返回的是已提交的状态的值(读已提交),不能读取交易过程中写入的数据
- 写集:包含键的列表,写入的数据的值,如果多次写入,以最后一次为准
- 版本号:用区块高度和交易编号组成的
- 交易验证阶段是对读写集进行验证(验证读集)
- 验证读集的版本号是否等于世界状态的版本号
账本存储相关概念:
- 世界状态
- 交易执行后,所有键的最新值
- 历史数据索引(可选)
- 区块存储
- 按照文件去存 blocfile_xxxxxx
- 文件大小是64M,若修改,需要重新编译peer源码
- 账本最大容量64M*
- 区块读取
- 区块文件流
- 区块流
- 迭代器
- 区块索引
- 键:区块高度、区块哈希、交易哈希
- 值:区块文件编号、文件内的偏移量、区块数据的长度
- 区块提交
- 保存到文件
账本存储相关源码:
- 从4方面看,读写集、状态数据、历史数据、区块文件
- 可以先从core/ledger下的ledger_interface.go中看大体结构
package ledger
import (
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)
//类似于前面orderer节点的manager,控制中枢
type PeerLedgerProvider interface {
//创建账本
Create(genesisBlock *common.Block) (PeerLedger, error)
//打开一个已有的账本
Open(ledgerID string) (PeerLedger, error)
//判断账本是否存在
Exists(ledgerID string) (bool, error)
//获取peer节点所有的账本
List() ([]string, error)
Close()
}
type PeerLedger interface {
//账本文件存储
commonledger.Ledger
//Get获取一些账本的区块索引相关内容
GetTransactionByID(txID string) (*peer.ProcessedTransaction, error)
GetBlockByHash(blockHash []byte) (*common.Block, error)
GetBlockByTxID(txID string) (*common.Block, error)
//获取交易的状态
//包含有效和无效
//无效有很多种,可以点进TxValidationCode去看定义
GetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
//实例化一个交易模拟器
NewTxSimulator() (TxSimulator, error)
//创建查询状态数据库的执行器
NewQueryExecutor() (QueryExecutor, error)
//创建历史状态查询的执行器
NewHistoryQueryExecutor() (HistoryQueryExecutor, error)
//裁剪的接口,没有实现
//官方没写的一个方法
Prune(policy commonledger.PrunePolicy) error
}
type ValidatedLedger interface {
//账本文件存储接口
//验证
commonledger.Ledger
}
//账本查询接口
type QueryExecutor interface {
//获取状态
//namespace:账本ID和链码名组成
GetState(namespace string, key string) ([]byte, error)
//获取多个状态
GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
//获取一个状态区间
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
//执行模糊查询,只有couchDB支持
ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error)
Done()
}
//历史查询接口
type HistoryQueryExecutor interface {
//根据key获取变动的历史
GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error)
}
//交易模拟的执行接口
type TxSimulator interface {
//查询执行器
QueryExecutor
//对状态进行更改
SetState(namespace string, key string, value []byte) error
DeleteState(namespace string, key string) error
SetStateMultipleKeys(namespace string, kvs map[string][]byte) error
ExecuteUpdate(query string) error
//获取交易模型
GetTxSimulationResults() ([]byte, error)
}
- 读写集,分为交易读写集生成和交易读写集验证两个部分去看
交易读写集生成:core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
//读写集的Builder
//用于构建读写集
//点进RWSetBuilder看读写集定义
rwsetBuilder *rwsetutil.RWSetBuilder
}
- 点进RWSetBuilder看读写集定义:
type RWSetBuilder struct {
//定义了一个map,按照namespace进行隔离的读写集
rwMap map[string]*nsRWs
}
//获取Builder中生成的读写集
func (rws *RWSetBuilder) GetTxReadWriteSet() *TxRwSet {
//go中map是无序的
//为了保证数据的一致性,将map转为了列表
txRWSet := &TxRwSet{}
sortedNamespaces := util.GetSortedKeys(rws.rwMap)
for _, ns := range sortedNamespaces {
nsReadWriteMap := rws.rwMap[ns]
交易读写集验证:fabric\core\ledger\kvledger\txmgmt\validator\statebasedval\state_based_validator.go
//实例化一个validator
func NewValidator(db statedb.VersionedDB) *Validator {
return &Validator{db}
}
//对交易集的验证
func (v *Validator) validateEndorserTX(envBytes []byte, doMVCCValidation bool, updates *statedb.UpdateBatch) (*rwsetutil.TxRwSet, peer.TxValidationCode, error) {
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
return nil, peer.TxValidationCode_NIL_TXACTION, nil
}
txRWSet := &rwsetutil.TxRwSet{}
//二进制反序列化为对象
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, peer.TxValidationCode_INVALID_OTHER_REASON, nil
}
//做交易验证
func (v *Validator) validateTx(txRWSet *rwsetutil.TxRwSet, updates *statedb.UpdateBatch) (peer.TxValidationCode, error) {
//按照namespace进行隔离验证
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
//验证读集
if valid, err := v.validateReadSet(ns, nsRWSet.KvRwSet.Reads, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_MVCC_READ_CONFLICT, nil
}
//验证RangeQueries
//RangeQueries是读集的扩展
if valid, err := v.validateRangeQueries(ns, nsRWSet.KvRwSet.RangeQueriesInfo, updates); !valid || err != nil {
if err != nil {
return peer.TxValidationCode(-1), err
}
return peer.TxValidationCode_PHANTOM_READ_CONFLICT, nil
}
}
return peer.TxValidationCode_VALID, nil
}
//验证读集
func (v *Validator) validateReadSet(ns string, kvReads []*kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
for _, kvRead := range kvReads {
if valid, err := v.validateKVRead(ns, kvRead, updates); !valid || err != nil {
return valid, err
}
}
return true, nil
}
//具体的验证读集的方法
func (v *Validator) validateKVRead(ns string, kvRead *kvrwset.KVRead, updates *statedb.UpdateBatch) (bool, error) {
//看一下状态更新里有没有这个读集
//若存在,说明在之前交易中被更新了,是一笔无效的交易
//(k1,1,v1) (k2 ,2,v2)
//Write(k1,2,v1') Read(k1)
if updates.Exists(ns, kvRead.Key) {
return false, nil
}
//读取key在世界状态中的版本值
versionedValue, err := v.db.GetState(ns, kvRead.Key)
if err != nil {
return false, err
}
//获取世界状态中的version
var committedVersion *version.Height
if versionedValue != nil {
committedVersion = versionedValue.Version
}
//验证世界状态中的version,与读集中的version是否相等
if !version.AreSame(committedVersion, rwsetutil.NewVersion(kvRead.Version)) {
//是不相等的,是一个无效的交易
logger.Debugf("Version mismatch for key [%s:%s]. Committed version = [%s], Version in readSet [%s]",
ns, kvRead.Key, committedVersion, kvRead.Version)
return false, nil
}
return true, nil
}
-
状态数据库:看levelDB的实现
- fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb/stateleveldb.go
//1.怎么关联智能合约键值,关联哪个文件,数据隔离
//2.怎么持久化区块信息
//3.怎么标识哪个是最新的区块
var logger = flogging.MustGetLogger("stateleveldb")
//组合键的分隔符
var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}
type VersionedDBProvider struct {
dbProvider *leveldbhelper.Provider
}
//数据隔离
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
//进行键的组合
compositeKey := constructCompositeKey(namespace, key)
dbVal, err := vdb.db.Get(compositeKey)
if err != nil {
return nil, err
}
if dbVal == nil {
return nil, nil
}
val, ver := statedb.DecodeValue(dbVal)
return &statedb.VersionedValue{Value: val, Version: ver}, nil
}
//一次获取多个key
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
vals := make([]*statedb.VersionedValue, len(keys))
for i, key := range keys {
//循环获取state
val, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
}
vals[i] = val
}
return vals, nil
}
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
compositeStartKey := constructCompositeKey(namespace, startKey)
compositeEndKey := constructCompositeKey(namespace, endKey)
if endKey == "" {
compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
}
dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
return newKVScanner(namespace, dbItr), nil
}
//levelDB目前没有实现模糊查询
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
return nil, errors.New("ExecuteQuery not supported for leveldb")
}
//解决区块如何持久化
//batch是区块验证的更新集
//height是区块高度
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
//实例化数据库事务
dbBatch := leveldbhelper.NewUpdateBatch()
//根据namespace进行分隔
namespaces := batch.GetUpdatedNamespaces()
for _, ns := range namespaces {
updates := batch.GetUpdates(ns)
for k, vv := range updates {
//组合键
compositeKey := constructCompositeKey(ns, k)
logger.Debugf("Channel [%s]: Applying key=[%#v]", vdb.dbName, compositeKey)
//若状态更新集为空
if vv.Value == nil {
//删除这个键
dbBatch.Delete(compositeKey)
} else {
//更新key
dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
}
}
}
//标识最新区块
dbBatch.Put(savePointKey, height.ToBytes())
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err
}
return nil
}
//获取最新的区块高度
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
versionBytes, err := vdb.db.Get(savePointKey)
if err != nil {
return nil, err
}
if versionBytes == nil {
return nil, nil
}
version, _ := version.NewHeightFromBytes(versionBytes)
return version, nil
}
//进行键的组合
func constructCompositeKey(ns string, key string) []byte {
return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
}
//基于分隔符进行分隔,转为namespace和key
func splitCompositeKey(compositeKey []byte) (string, string) {
split := bytes.SplitN(compositeKey, compositeKeySep, 2)
return string(split[0]), string(split[1])
}
-
历史数据库:看levelDB的实现
- fabric/core/ledger/kvledger/history/historydb/historyleveldb/historyleveldb.go
- 标识某个key被某个交易改变了,看Commit()方法
- 怎么查询一个key的变动历史
//标识某个key被某个交易改变了,通过组合键实现
func (historyDB *historyDB) Commit(block *common.Block) error {
for _, nsRWSet := range txRWSet.NsRwSets {
ns := nsRWSet.NameSpace
//循环处理写集
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
writeKey := kvWrite.Key
//组合键
//composite key for history records is in the form ns~key~blockNo~tranNo
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
//存储
dbBatch.Put(compositeHistoryKey, emptyValue)
}
}
//接收4个参数
//ns:namespace,标识这个key属于哪一个通道,哪个智能合约
//key:智能合约的键
//blocknum:区块高度
//trannum:交易编号
//组合键的形式:ns/key/blocknum/trannum
//最终存到levelDB中
func ConstructCompositeHistoryKey(ns string, key string, blocknum uint64, trannum uint64) []byte {
var compositeKey []byte
compositeKey = append(compositeKey, []byte(ns)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, []byte(key)...)
compositeKey = append(compositeKey, compositeKeySep...)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(blocknum)...)
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(trannum)...)
return compositeKey
}
- 区块文件读取
区块流:fabric/common/ledger/blkstorage/fsblkstorage/block_stream.go
//区块流
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
//实例化流
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
//移动到下一个流
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
账本存储相关源码总结:
- 首先看了账本存储的根目录的接口,从宏观上把握
- 交易读写集的生成和验证
- 数据库增删改查
- 还有一个没有实现的方法,数据裁剪的接口,可能去解决数据无限增长的处理