Michael.W谈hyperledger Fabric第21期-详细带读Fabric的源码6-orderer节点接收交易数据与广播区块数据

Michael.W谈hyperledger Fabric第21期-详细带读Fabric的源码6-orderer节点接收交易数据与广播区块数据


这两个部分围绕着orderer/server.go文件里的server结构体展开:

	type server struct {
		// 交易数据的接收
		bh broadcast.Handler
		// 区块数据的广播 
		dh deliver.Handler
	}

1 接收交易数据

进入交易接收,查看broadcast.Handler定义:进入orderer/common/broadcast/broadcast.go中

	type Handler interface {
	  // 为gRPC连接启动一个服务线程并服务于广播的连接
		Handle(srv ab.AtomicBroadcast_BroadcastServer) error
	}

这个文件里面有很多接口定义,这里就不一一介绍了。
来主要看一下Handle这个成员方法,即对Handler接口的实现:

	func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
		logger.Debugf("Starting new broadcast loop")
		for {
	    	// 交易的接收(基于gRPC)
			msg, err := srv.Recv()
			if err == io.EOF {
				logger.Debugf("Received EOF, hangup")
				return nil
			}
			if err != nil {
				logger.Warningf("Error reading from stream: %s", err)
				return err
			}
	
	    	// 将接收到的消息(protofuf序列化),反序列化成Payload对象
			payload, err := utils.UnmarshalPayload(msg.Payload)
	    	// 校验反序列化数据的正确
			if err != nil {
				logger.Warningf("Received malformed message, dropping connection: %s", err)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
			}
			if payload.Header == nil {
				logger.Warningf("Received malformed message, with missing header, dropping connection")
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
			}
			// 反序列化payload.Header.ChannelHeader,并验证
			chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
			if err != nil {
				logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
			}
	
	   
			if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
	       		// 如果该交易消息是配置交易(更新配置)
				logger.Debugf("Preprocessing CONFIG_UPDATE")
	     		// 对该*cb.Envelope类型的配置交易进行一次格式上的转换,但从外面看转换后依然是*cb.Envelope。
				msg, err = bh.sm.Process(msg)
				if err != nil {
					logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
					return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
				}
				// 此处也可以理解为对如果是配置交易,就对Payload做一次二次校验
				err = proto.Unmarshal(msg.Payload, payload)
				if err != nil || payload.Header == nil {
					logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
					return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
				}
				// 此处也可以理解为对如果是配置交易,就对payload.Header.ChannelHeader做一次二次校验
				chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
				if err != nil {
					logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
					return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
				}
				if chdr.ChannelId == "" {
					logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
					return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
				}
			}
			// 通过通道ID获得一个chainsupport对象
			support, ok := bh.sm.GetChain(chdr.ChannelId)
			if !ok {
				logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
			}
	
			logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
	
	
	    //	将该交易信息传到chainsupport的过滤器进行过滤。这个地方是第一次对交易消息的过滤,之前看区块分割的源代码的Ordered方法中对交易数据又进行了一次过滤
			_, filterErr := support.Filters().Apply(msg)
		
			if filterErr != nil {
				logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
			}
	
			// 如果以上检查都没问题,将该交易信息入列。加入队列成功返回true,加入失败返回false。
	    	// 如果加入队列成功,随后就被solo或kafka进行排序处理
			if !support.Enqueue(msg) {
				return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
			}
			if logger.IsEnabledFor(logging.DEBUG) {
				logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
			}
			// 处理成功后,返回一个SUCCESS信息。
			err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
			if err != nil {
				logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
				return err
			}
		}
	}
const (
	Status_UNKNOWN                  Status = 0
	Status_SUCCESS                  Status = 200
	Status_BAD_REQUEST              Status = 400
	Status_FORBIDDEN                Status = 403
	Status_NOT_FOUND                Status = 404
	Status_REQUEST_ENTITY_TOO_LARGE Status = 413
	Status_INTERNAL_SERVER_ERROR    Status = 500
	Status_SERVICE_UNAVAILABLE      Status = 503
)
// 可见处理成功后,客户端会收到orderer节点发过来的状态码200。

在之前我手动搭建Fabric网络时,也会看见在转账成功后,会有"status:200"的提示。

2 广播区块数据

区块是如何被扩散出去的?
进入⁨orderer⁩/common⁩/deliver⁩/deliver⁩.go中,主要来看一下deliverServer对Handle方法的实现:

	// Handler接口用来对发送请求进行管理
	type Handler interface {
		Handle(srv ab.AtomicBroadcast_DeliverServer) error
	}
	
	// Handle方法的实现
	func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
		logger.Debugf("Starting new deliver loop")
		for {
			logger.Debugf("Attempting to read seek info message")
			// 接收别处发来的请求
			envelope, err := srv.Recv()
			if err == io.EOF {
				logger.Debugf("Received EOF, hangup")
				return nil
			}
			if err != nil {
				logger.Warningf("Error reading from stream: %s", err)
				return err
			}
			// 将接收到的消息(protofuf序列化),反序列化成Payload对象
			payload, err := utils.UnmarshalPayload(envelope.Payload)
			if err != nil {
				logger.Warningf("Received an envelope with no payload: %s", err)
				return sendStatusReply(srv, cb.Status_BAD_REQUEST)
			}
			
			if payload.Header == nil {
				logger.Warningf("Malformed envelope received with bad header")
				return sendStatusReply(srv, cb.Status_BAD_REQUEST)
			}
			// 反序列化payload.Header.ChannelHeader,并校验
			chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
			if err != nil {
				logger.Warningf("Failed to unmarshal channel header: %s", err)
				return sendStatusReply(srv, cb.Status_BAD_REQUEST)
			}
	
			// 通过通道ID获取chainsupport对象
			chain, ok := ds.sm.GetChain(chdr.ChannelId)
			if !ok {
				logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
				return sendStatusReply(srv, cb.Status_NOT_FOUND)
			}
	
			// 监听是否错误发生。erroredChan类型为 <-chan struct{}
			erroredChan := chain.Errored()
			select {
	     	// 如果监听到有错误发生
			case <-erroredChan:
				logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
				return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
			default:
	
			}
			// 获取通道最新的配置***,以此来检测通道配置有没有被更改
			lastConfigSequence := chain.Sequence()
			// 得到一个签名过滤器。每一次对签名进行评估前,都会去调用策略的名称。
	    // 因为策略的名称和策略本身都是可变的。这也是为什么每次在调用策略的时候不仅要通过策略本身来检索,还需要通过策略的名称来检索。
			sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
	    	// 验证请求发出者的身份是否有效
			result, _ := sf.Apply(envelope)
			if result != filter.Forward {
				logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
				return sendStatusReply(srv, cb.Status_FORBIDDEN)
			}
	
			// 解析传递过来的请求消息的内容[1]
			seekInfo := &ab.SeekInfo{}
			if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
				logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
				return sendStatusReply(srv, cb.Status_BAD_REQUEST)
			}
	    
	
			// 校验请求消息中的Start和Stop是否正确
			if seekInfo.Start == nil || seekInfo.Stop == nil {
				logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
				return sendStatusReply(srv, cb.Status_BAD_REQUEST)
			}
	
			logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)
	
			cursor, number := chain.Reader().Iterator(seekInfo.Start)
			var stopNum uint64
			switch stop := seekInfo.Stop.Type.(type) {
	      	// 如果要查询最原始的区块,那么Stop的值跟Start相等
			case *ab.SeekPosition_Oldest:
				stopNum = number
	      	// 如果要查询最新的区块,那么Stop的值等于当前区块高度-1
			case *ab.SeekPosition_Newest:
				stopNum = chain.Reader().Height() - 1
	      	// 自定义一个区块编号来查询
			case *ab.SeekPosition_Specified:
				stopNum = stop.Specified.Number
	      		//如果自定义编号大于账本中的区块数
				if stopNum < number {
					logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
					return sendStatusReply(srv, cb.Status_BAD_REQUEST)
				}
			}
			// 判断行为
			for {
	      		// 如果请求的行为为等待orderer
				if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
					select {
	          		// 监听出现错误,直接退出
					case <-erroredChan:
						logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
						return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
					case <-cursor.ReadyChan():
					}
				} else {
	        		//如果请求的行为为不等待orderer
					select {
					case <-cursor.ReadyChan():
					default:
	          			// 只监听一次,如果orderer不准备好,直接退出,状态码404
						return sendStatusReply(srv, cb.Status_NOT_FOUND)
					}
				}
				// 校验配置信息是否被更改
	      currentConfigSequence := chain.Sequence()
				if currentConfigSequence > lastConfigSequence {
	        // 如果当前的配置***大于最新的配置***(表名当前配置信息已经被更改)
	        		// 将最新的配置***改为当前的配置***
					lastConfigSequence = currentConfigSequence
	        		// 生成签名过滤器,验证请求的有效性
					sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
					result, _ := sf.Apply(envelope)
					if result != filter.Forward {
						logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
						return sendStatusReply(srv, cb.Status_FORBIDDEN)
					}
				}
	
				// 如果以上的验证都没问题,开始循环读取区块
				block, status := cursor.Next()
				if status != cb.Status_SUCCESS {
					logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
					return sendStatusReply(srv, status)
				}
	
				logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)
	
	      		// 返回查到的区块返给消息请求方(即peer组织的主节点)
				if err := sendBlockReply(srv, block); err != nil {
					logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
					return err
				}
				// 如果读到了事先在请求消息中的结束的位置,跳出循环
				if stopNum == block.Header.Number {
					break
				}
			}
	
			// 返回一个成功的标识
			if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
				logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
				return err
			}
			logger.Debugf("[channel: %s] Done delivering for (%p), waiting for new SeekInfo", chdr.ChannelId, seekInfo)
		}
	}

[1] 看一下请求内容对象ab.SeekInfo的结构:

	type SeekInfo struct {
	  	// 从哪个区块开始查询
		Start    *SeekPosition         `protobuf:"bytes,1,opt,name=start" json:"start,omitempty"`
	  	// 到哪个区块结束查询
		Stop     *SeekPosition         `protobuf:"bytes,2,opt,name=stop" json:"stop,omitempty"`
		Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"`
	  // SeekInfo_SeekBehavior有两种行为,一种是一直等到orderer节点准备就绪,一种是如果orderer节点没有准备好直接返回
	}

以上就是orderer节点将排完序的区块扩散到Fabric网络中的整个过程。

ps:
本人热爱图灵,热爱中本聪,热爱V神,热爱一切被梨花照过的姑娘。
以下是我个人的公众号,如果有技术问题可以关注我的公众号来跟我交流。
同时我也会在这个公众号上每周更新我的原创文章,喜欢的小伙伴或者老伙计可以支持一下!
如果需要转发,麻烦注明作者。十分感谢!
Michael.W谈hyperledger Fabric第21期-详细带读Fabric的源码6-orderer节点接收交易数据与广播区块数据
公众号名称:后现代泼痞浪漫主义奠基人