以太坊源码之交易池 交易数据维护分析

今天将对交易池中的交易数据维护进行分析。具体分析将从本地用户提交交易数据源码部分开始解析,至于本地用户如何生成交易数据以后在分析,下面看源码:

源码路径:go-ethereum\internal\ethapi\api.go
func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
	if err := b.SendTx(ctx, tx); err != nil {
		return common.Hash{}, err
	}
	
	if tx.To() == nil {
		signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
		from, err := types.Sender(signer, tx)
		if err != nil {
			return common.Hash{}, err
		}
		addr := crypto.CreateAddress(from, tx.Nonce())
		log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex())
	} else {
		log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To())
	}
	
	return tx.Hash(), nil
}

上述源码主要有以下几点功能:

  1. b.SendTx(ctx, tx)向交易池提交数据
  2. 从交易数据是否有接收方地址判断是普通交易还是合约交易
    如果为合约交易,将依据发起合约的钱包地址和此钱包用户的交易Nonce组合进行rlp编 码后的数据进行Keccak256计算,取最后20位数据为合约地址
  3. 返回交易哈希值
    上述代码实现的功能比较简单,接下来重点对 b.SendTx(ctx, tx)源码进行分析,看看以太坊如何认为一笔交易合法以及queue和pending交易数据队列的维护机制,下面看源码:
源码路径:go-ethereum\eth/api_backend.go
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
	return b.eth.txPool.AddLocal(signedTx)
}

将交易数据添加到本地:
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
	return pool.addTx(tx, !pool.config.NoLocals)
}

func (pool *TxPool) AddLocal(tx *types.Transaction) error {
	return pool.addTx(tx, !pool.config.NoLocals)
}

func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
	pool.mu.Lock()
	defer pool.mu.Unlock()

	//将交易加入本地
	replace, err := pool.add(tx, local)
	if err != nil {
		return err
	}

	if !replace {
		from, _ := types.Sender(pool.signer, tx) // already validated
		//对交易池数据进行维护
		pool.promoteExecutables([]common.Address{from})
	}
	return nil
}

通过以上源码可以看出,一笔交易提交到交易池,大致有二个步骤:
1.判断交易数据是否合法并提交的queue队列, pool.add(tx, local)中实现
2.对交易池中数据的维护,pool.promoteExecutables([]common.Address{from})中实现
下面分别对这两个函数的内部实现进行解析,先看pool.add(tx, local)源码,代码如下:

func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
	// 如果这笔交易的哈希值在交易池中存在则直接退出返回错误
	hash := tx.Hash()
	if pool.all[hash] != nil {
		log.Trace("Discarding already known transaction", "hash", hash)
		return false, fmt.Errorf("known transaction: %x", hash)
	}
	// 判断这边交易数据时候正确,如gas检查
	if err := pool.validateTx(tx, local); err != nil {
		log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
		invalidTxCounter.Inc(1)
		return false, err
	}
	//交易池总数目前已经超过queue和pending的总和,默认(1024+4096)
	if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
		// 不是本地交易&交易gas低于目前交易池最低,交易拒绝加入交易池
		if !local && pool.priced.Underpriced(tx, pool.locals) {
			log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
			underpricedTxCounter.Inc(1)
			return false, ErrUnderpriced
		}
		// 剔除gas低的交易
		drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
		for _, tx := range drop {
			log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
			underpricedTxCounter.Inc(1)
			pool.removeTx(tx.Hash(), false)
		}
	}
	//从交易数据中获取发起交易的钱包地址
	from, _ := types.Sender(pool.signer, tx)
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		// 对于同一个钱包地址,一个Nonce在之前已经存在,判断gas是否需要替换旧交易数据
		// list.Overlaps 来检查pending中是否包含相同Nonce的交易
		inserted, old := list.Add(tx, pool.config.PriceBump)
		if !inserted {
			pendingDiscardCounter.Inc(1)
			return false, ErrReplaceUnderpriced
		}
		//剔除旧的交易
		if old != nil {
			delete(pool.all, old.Hash())
			pool.priced.Removed()
			pendingReplaceCounter.Inc(1)
		}
		pool.all[tx.Hash()] = tx
		pool.priced.Put(tx)
		pool.journalTx(from, tx)

		log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())

		// We've directly injected a replacement transaction, notify subsystems
		go pool.txFeed.Send(TxPreEvent{tx})

		return old != nil, nil
	}
	//是个新的交易且交易池的存储交易数未达到上限,直接加入交易池
	replace, err := pool.enqueueTx(hash, tx)
	if err != nil {
		return false, err
	}
	// 本地交易,加入白名单
	if local {
		pool.locals.add(from)
	}
	//将交易数据回写到本地磁盘上
	pool.journalTx(from, tx)

	log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
	return replace, nil
}

上述代码主要实现以下几点功能:
1.将本交易哈希值与交易池中的交易哈希值进行对比,如果存在直接返回
2.检查交易数据是否符合要求,实现函数 pool.validateTx(tx, local),后续分析
3.交易池的交易总数超过上限,进行交易gas的对比,gas比交易池中最低的还要低直接丢弃;
否则移 除交易池中交易gas低的交易数据;
如果发起交易的钱包地址存在交易Nonce,则删除老的交易加入新的交易,返回
4.不满足条件3,即交易池存储的交易数未超过上限,加入queue队列,实现函数 pool.enqueueTx(hash, tx),后续分析
5.如果该交易是本地钱包发起的交易,加入白名单
6.将交易数据在回写到本地的硬盘上
以上就是将交易数据加入交易池的过程,下面针对其中几个关键操作的函数实现做具体的分析。

交易数据正确性验证函数:pool.validateTx(tx, local

func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
	// 交易数据大于32KB直接丢弃,防止DOS攻击
	if tx.Size() > 32*1024 {
		return ErrOversizedData
	}
	//交易额必须是大于0
	if tx.Value().Sign() < 0 {
		return ErrNegativeValue
	}
	// 交易gas的上限必须小于区块gas的上限值.
	if pool.currentMaxGas < tx.Gas() {
		return ErrGasLimit
	}
	// 签名值有效,能从交易数据中解析出发起交易的钱包地址
	from, err := types.Sender(pool.signer, tx)
	if err != nil {
		return ErrInvalidSender
	}
	//非本地交易,交易gas price必须高于设置的最低的gas price
	if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
		return ErrUnderpriced
	}
	// 交易的Noce必须大于区块的记录的nNonce
	if pool.currentState.GetNonce(from) > tx.Nonce() {
		return ErrNonceTooLow
	}
	//当前账户必须保证有充足的以太坊
	if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
		return ErrInsufficientFunds
	}
    //依据交易数据计算需要的gas
	intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
	if err != nil {
		return err
	}
	//交易gas必须大于intrGas
	if tx.Gas() < intrGas {
		return ErrIntrinsicGas
	}
	return nil
}

交易数据加入queue队列函数:pool.enqueueTx(hash, tx)

func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
	//从交易数据中的签名数据获取钱包地址
	from, _ := types.Sender(pool.signer, tx)
    
	if pool.queue[from] == nil {
		pool.queue[from] = newTxList(false)
	}
	//将交易数据加入queue队列
	inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
	if !inserted {
		// 如果该交易的Nonce已存在,并且新的交易gas/旧的交易gas < 110%
		queuedDiscardCounter.Inc(1)
		return false, ErrReplaceUnderpriced
	}
	// 新的交易gas优于旧的交易gas,剔除旧的交易
	if old != nil {
		delete(pool.all, old.Hash())
		pool.priced.Removed()
		queuedReplaceCounter.Inc(1)
	}
	if pool.all[hash] == nil {
		pool.all[hash] = tx
		pool.priced.Put(tx)
	}
	return old != nil, nil
}

以上就是对将一笔交易加入queue队列的过程,下面接着分析交易升级,即queue队列到pending队列的实现,具体实现函数为pool.promoteExecutables(),下面对源码进行分析:

func (pool *TxPool) promoteExecutables(accounts []common.Address) {
	// accounts存储了所有潜在需要更新的账户。 如果账户传入为nil,代表所有已知的账户
	if accounts == nil {
		accounts = make([]common.Address, 0, len(pool.queue))
		for addr := range pool.queue {
			accounts = append(accounts, addr)
		}
	}
	
	for _, addr := range accounts {
		list := pool.queue[addr]
		if list == nil {
			continue // 该地址上没有queue交易数据
		}
		// 删除小于该钱包地址对应最新的Nonce交易
		for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
			hash := tx.Hash()
			log.Trace("Removed old queued transaction", "hash", hash)
			delete(pool.all, hash)
			pool.priced.Removed()
		}
		// 删除所有余额不足的交易
		drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			log.Trace("Removed unpayable queued transaction", "hash", hash)
			delete(pool.all, hash)
			pool.priced.Removed()
			queuedNofundsCounter.Inc(1)
		}
		// 将所有符合交易的queue队列加入到pending队列
		for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
			hash := tx.Hash()
			log.Trace("Promoting queued transaction", "hash", hash)
			pool.promoteTx(addr, hash, tx)
		}
		 // 删除所有超过限制的交易,对于单个用户的queue队列,默认上限是64
		if !pool.locals.contains(addr) {
			for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
				hash := tx.Hash()
				delete(pool.all, hash)
				pool.priced.Removed()
				queuedRateLimitCounter.Inc(1)
				log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
			}
		}
		// 删除没有交易数据的queue队列
		if list.Empty() {
			delete(pool.queue, addr)
		}
	}

	pending := uint64(0)
	for _, list := range pool.pending {
	    //获取交易池中所有pending的交易数总量
		pending += uint64(list.Len())
	}
	//pending超过上限,系统默认4096
	if pending > pool.config.GlobalSlots {
		pendingBeforeCap := pending
		// Assemble a spam order to penalize large transactors first
		spammers := prque.New()
		for addr, list := range pool.pending {
			// 非本地账号&pending交易数系统默认值16
			if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
				spammers.Push(addr, float32(list.Len()))
				// 注意spammers是一个优先级队列,也就是说是按照交易的多少从大到小排序的
			}
		}
		
		offenders := []common.Address{}
		for pending > pool.config.GlobalSlots && !spammers.Empty() {
			// Retrieve the next offender if not local address
			offender, _ := spammers.Pop()
			offenders = append(offenders, offender.(common.Address))

			// Equalize balances until all the same or below threshold
			if len(offenders) > 1 {
			    // 第一次进入这个循环的时候, offenders队列里面有交易数量最大的两个账户
				// 把最后加入的账户的交易数量当成本次的阈值
				threshold := pool.pending[offender.(common.Address)].Len()

				 //pending总数超过默认上限&&当前地址的交易数超过threshold 
				for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
				    // 遍历除了最后一个账户以外的所有账户,每次循环把他们的交易数量减去1,直到不满足for循环条件
					for i := 0; i < len(offenders)-1; i++ {
						list := pool.pending[offenders[i]]
						for _, tx := range list.Cap(list.Len() - 1) {
							// 从交易池中移除该交易数据
							hash := tx.Hash()
							delete(pool.all, hash)
							pool.priced.Removed()

							//更新pending的交易Nonce,以账户地址做区分
							if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
								pool.pendingState.SetNonce(offenders[i], nonce)
							}
							log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
						}
						pending--
					}
				}
			}
		}

		// 经过上面的循环,所有的超过AccountSlots的账户的交易数量都变成了之前的最小值。
        // 如果还是超过阈值,那么在继续从offenders里面每次删除一个。
		if pending > pool.config.GlobalSlots && len(offenders) > 0 {
			for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
			  //重复上述的动作,每次循环将每个账号的交易数-1
				for _, addr := range offenders {
					list := pool.pending[addr]
					for _, tx := range list.Cap(list.Len() - 1) {
						
						hash := tx.Hash()
						delete(pool.all, hash)
						pool.priced.Removed()

						if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
							pool.pendingState.SetNonce(addr, nonce)
						}
						log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
					}
					pending--
				}
			}
		}
		pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
	}
	// 上述处理的是pending限制,接下来处理queue限制
	queued := uint64(0)
	for _, list := range pool.queue {
		queued += uint64(list.Len())
		//获取交易池中所有账户的queue的总和
	}
	if queued > pool.config.GlobalQueue {
		//queue队列超过设置上限,默认1024
		addresses := make(addresssByHeartbeat, 0, len(pool.queue))
		for addr := range pool.queue {
			if !pool.locals.contains(addr) { // 跳过本地地址,本地地址为白名单
				addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
			}
		}
		//依据心跳时间对地址进行排序处理
		sort.Sort(addresses)

		for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
		    // 从后往前,也就是心跳越新的就越会被删除。
			addr := addresses[len(addresses)-1]
			list := pool.queue[addr.address]

			addresses = addresses[:len(addresses)-1]

			if size := uint64(list.Len()); size <= drop {
				for _, tx := range list.Flatten() {
					pool.removeTx(tx.Hash(), true)
				}
				drop -= size
				queuedRateLimitCounter.Inc(int64(size))
				continue
			}
			
			txs := list.Flatten()
			for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
				pool.removeTx(txs[i].Hash(), true)
				drop--
				queuedRateLimitCounter.Inc(1)
			}
		}
	}
}

针对上述代码,大概步骤如下:
1.剔除一些无效的交易,如Nonce太小,out-of-gas
2.将符合规则的queue队列交易提升到pending,实现函数pool.promoteTx(addr, hash, tx),后续做分析
3.针对queue单个账户的最大限制做交易移除处理
4.针对pending队列的全局限制,以账户包含最小的交易数为标准,删除每个账号多余的交易;如pending的全局显示为128,目前有10账号,账号包含最小交易数为15,经过这轮处理,每个账号的交易数均为15,总交易数为150
5.经过4的处理,如果交易任然超过上限,将以每个账号在pending队列交易数为标准,删除多余交易数
6.queue队列的维护,如果queue队列总量超过系统设置上限,将从交易池删除心跳老的交易

交易池交易升级函数:pool.promoteTx()

func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
	// 在pending队列没有addr地址的交易记录,新建一个TxList
	if pool.pending[addr] == nil {
		pool.pending[addr] = newTxList(true)
	}
	list := pool.pending[addr]
    
	inserted, old := list.Add(tx, pool.config.PriceBump)
	if !inserted {
		// 该账号交易序号nonce在pending存在并且交易gas优于新产生的交易,删除新交易
		delete(pool.all, hash)
		pool.priced.Removed()

		pendingDiscardCounter.Inc(1)
		return
	}

	if old != nil {
	   // 该账号交易序号nonce在pending存在并且新产生的交易gas优于旧交易,删除旧交易
		delete(pool.all, old.Hash())
		pool.priced.Removed()
		pendingReplaceCounter.Inc(1)
	}
	
	if pool.all[hash] == nil {
		pool.all[hash] = tx
		pool.priced.Put(tx)
	}
	// 更新账号的心跳时间&设置账号的交易Nonce
	pool.beats[addr] = time.Now()
	pool.pendingState.SetNonce(addr, tx.Nonce()+1)
	
	//发送Tx事件
	go pool.txFeed.Send(TxPreEvent{tx})
}

**下面对pool.txFeed.Send()发送事件函数进行解析:**
func (f *Feed) Send(value interface{}) (nsent int) {
	rvalue := reflect.ValueOf(value)

	f.once.Do(f.init) //重新初始化,onece.Do保证只会执行一次
	<-f.sendLock     //读sendLock通道,若sendLock为空则会堵塞

	f.mu.Lock()
	f.sendCases = append(f.sendCases, f.inbox...)  //将inbox注入到sendCase
	f.inbox = nil

	if !f.typecheck(rvalue.Type()) {
		f.sendLock <- struct{}{}  //出错了,退出前先写sendLock以免下次send操作堵塞
		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
	}
	f.mu.Unlock()

	//给所有通道设置要发送的数据
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = rvalue
	}

	cases := f.sendCases
	for {
		for i := firstSubSendCase; i < len(cases); i++ {
		//首先使用TrySend进行发送,这是一种非阻塞操作。当订阅者足够快时一般能够立即成功
			if cases[i].Chan.TrySend(rvalue) {
				nsent++
				cases = cases.deactivate(i) //发送成功,后移该通道
				i--
			}
		}
		if len(cases) == firstSubSendCase {
			break  //所有通道发送完成,退出
		}
		
		chosen, recv, _ := reflect.Select(cases)  //等待通道返回
		if chosen == 0 /* <-f.removeSub */ {
			index := f.sendCases.find(recv.Interface())
			f.sendCases = f.sendCases.delete(index)
			if index >= 0 && index < len(cases) {
				// Shrink 'cases' too because the removed case was still active.
				cases = f.sendCases[:len(cases)-1]
			}
		} else {
			cases = cases.deactivate(chosen)
			nsent++
		}
	}

	// Forget about the sent value and hand off the send lock.
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = reflect.Value{}  // 把sendCases中的send都标记为空
	}
	f.sendLock <- struct{}{}  //返回时写入sendLock,为下次发送做准备
	return nsent
}
这里需要重点了解以太坊的时间机制Feed,后续有时间将重点介绍

以上是交易池中交易数据的维护过程,如有不正确的地方还望读者能指出!
针对交易池底层数据结构源码分析,推荐一个博文:
https://blog.csdn.net/itcastcpp/article/details/80305213

撸了一下午的代码,该带女友出去转转了,不然要*了。。。

以太坊源码之交易池 交易数据维护分析
后续我将陆续将以太坊源码的学习笔记在博客上共享。
如果你有对区块链技术感兴趣,并愿意私下一起探讨区块技术,可以扫码加我个人微信,请备注来源【CSDN博客】