以太坊源码之交易池 交易数据维护分析
今天将对交易池中的交易数据维护进行分析。具体分析将从本地用户提交交易数据源码部分开始解析,至于本地用户如何生成交易数据以后在分析,下面看源码:
源码路径:go-ethereum\internal\ethapi\api.go
func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err
}
if tx.To() == nil {
signer := types.MakeSigner(b.ChainConfig(), b.CurrentBlock().Number())
from, err := types.Sender(signer, tx)
if err != nil {
return common.Hash{}, err
}
addr := crypto.CreateAddress(from, tx.Nonce())
log.Info("Submitted contract creation", "fullhash", tx.Hash().Hex(), "contract", addr.Hex())
} else {
log.Info("Submitted transaction", "fullhash", tx.Hash().Hex(), "recipient", tx.To())
}
return tx.Hash(), nil
}
上述源码主要有以下几点功能:
- b.SendTx(ctx, tx)向交易池提交数据
- 从交易数据是否有接收方地址判断是普通交易还是合约交易
如果为合约交易,将依据发起合约的钱包地址和此钱包用户的交易Nonce组合进行rlp编 码后的数据进行Keccak256计算,取最后20位数据为合约地址 - 返回交易哈希值
上述代码实现的功能比较简单,接下来重点对 b.SendTx(ctx, tx)源码进行分析,看看以太坊如何认为一笔交易合法以及queue和pending交易数据队列的维护机制,下面看源码:
源码路径:go-ethereum\eth/api_backend.go
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}
将交易数据添加到本地:
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
func (pool *TxPool) AddLocal(tx *types.Transaction) error {
return pool.addTx(tx, !pool.config.NoLocals)
}
func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
//将交易加入本地
replace, err := pool.add(tx, local)
if err != nil {
return err
}
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
//对交易池数据进行维护
pool.promoteExecutables([]common.Address{from})
}
return nil
}
通过以上源码可以看出,一笔交易提交到交易池,大致有二个步骤:
1.判断交易数据是否合法并提交的queue队列, pool.add(tx, local)中实现
2.对交易池中数据的维护,pool.promoteExecutables([]common.Address{from})中实现
下面分别对这两个函数的内部实现进行解析,先看pool.add(tx, local)源码,代码如下:
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// 如果这笔交易的哈希值在交易池中存在则直接退出返回错误
hash := tx.Hash()
if pool.all[hash] != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
// 判断这边交易数据时候正确,如gas检查
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
//交易池总数目前已经超过queue和pending的总和,默认(1024+4096)
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// 不是本地交易&交易gas低于目前交易池最低,交易拒绝加入交易池
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
// 剔除gas低的交易
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
}
}
//从交易数据中获取发起交易的钱包地址
from, _ := types.Sender(pool.signer, tx)
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// 对于同一个钱包地址,一个Nonce在之前已经存在,判断gas是否需要替换旧交易数据
// list.Overlaps 来检查pending中是否包含相同Nonce的交易
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
//剔除旧的交易
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
pool.all[tx.Hash()] = tx
pool.priced.Put(tx)
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// We've directly injected a replacement transaction, notify subsystems
go pool.txFeed.Send(TxPreEvent{tx})
return old != nil, nil
}
//是个新的交易且交易池的存储交易数未达到上限,直接加入交易池
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
// 本地交易,加入白名单
if local {
pool.locals.add(from)
}
//将交易数据回写到本地磁盘上
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}
上述代码主要实现以下几点功能:
1.将本交易哈希值与交易池中的交易哈希值进行对比,如果存在直接返回
2.检查交易数据是否符合要求,实现函数 pool.validateTx(tx, local),后续分析
3.交易池的交易总数超过上限,进行交易gas的对比,gas比交易池中最低的还要低直接丢弃;
否则移 除交易池中交易gas低的交易数据;
如果发起交易的钱包地址存在交易Nonce,则删除老的交易加入新的交易,返回
4.不满足条件3,即交易池存储的交易数未超过上限,加入queue队列,实现函数 pool.enqueueTx(hash, tx),后续分析
5.如果该交易是本地钱包发起的交易,加入白名单
6.将交易数据在回写到本地的硬盘上
以上就是将交易数据加入交易池的过程,下面针对其中几个关键操作的函数实现做具体的分析。
交易数据正确性验证函数:pool.validateTx(tx, local
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// 交易数据大于32KB直接丢弃,防止DOS攻击
if tx.Size() > 32*1024 {
return ErrOversizedData
}
//交易额必须是大于0
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
// 交易gas的上限必须小于区块gas的上限值.
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
// 签名值有效,能从交易数据中解析出发起交易的钱包地址
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
//非本地交易,交易gas price必须高于设置的最低的gas price
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// 交易的Noce必须大于区块的记录的nNonce
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
//当前账户必须保证有充足的以太坊
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
//依据交易数据计算需要的gas
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
//交易gas必须大于intrGas
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
交易数据加入queue队列函数:pool.enqueueTx(hash, tx)
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
//从交易数据中的签名数据获取钱包地址
from, _ := types.Sender(pool.signer, tx)
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
//将交易数据加入queue队列
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// 如果该交易的Nonce已存在,并且新的交易gas/旧的交易gas < 110%
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
// 新的交易gas优于旧的交易gas,剔除旧的交易
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
if pool.all[hash] == nil {
pool.all[hash] = tx
pool.priced.Put(tx)
}
return old != nil, nil
}
以上就是对将一笔交易加入queue队列的过程,下面接着分析交易升级,即queue队列到pending队列的实现,具体实现函数为pool.promoteExecutables(),下面对源码进行分析:
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// accounts存储了所有潜在需要更新的账户。 如果账户传入为nil,代表所有已知的账户
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // 该地址上没有queue交易数据
}
// 删除小于该钱包地址对应最新的Nonce交易
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
}
// 删除所有余额不足的交易
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// 将所有符合交易的queue队列加入到pending队列
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Promoting queued transaction", "hash", hash)
pool.promoteTx(addr, hash, tx)
}
// 删除所有超过限制的交易,对于单个用户的queue队列,默认上限是64
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
// 删除没有交易数据的queue队列
if list.Empty() {
delete(pool.queue, addr)
}
}
pending := uint64(0)
for _, list := range pool.pending {
//获取交易池中所有pending的交易数总量
pending += uint64(list.Len())
}
//pending超过上限,系统默认4096
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
for addr, list := range pool.pending {
// 非本地账号&pending交易数系统默认值16
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
// 注意spammers是一个优先级队列,也就是说是按照交易的多少从大到小排序的
}
}
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// 第一次进入这个循环的时候, offenders队列里面有交易数量最大的两个账户
// 把最后加入的账户的交易数量当成本次的阈值
threshold := pool.pending[offender.(common.Address)].Len()
//pending总数超过默认上限&&当前地址的交易数超过threshold
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
// 遍历除了最后一个账户以外的所有账户,每次循环把他们的交易数量减去1,直到不满足for循环条件
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
// 从交易池中移除该交易数据
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
//更新pending的交易Nonce,以账户地址做区分
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
// 经过上面的循环,所有的超过AccountSlots的账户的交易数量都变成了之前的最小值。
// 如果还是超过阈值,那么在继续从offenders里面每次删除一个。
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
//重复上述的动作,每次循环将每个账号的交易数-1
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
hash := tx.Hash()
delete(pool.all, hash)
pool.priced.Removed()
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
// 上述处理的是pending限制,接下来处理queue限制
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
//获取交易池中所有账户的queue的总和
}
if queued > pool.config.GlobalQueue {
//queue队列超过设置上限,默认1024
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // 跳过本地地址,本地地址为白名单
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
//依据心跳时间对地址进行排序处理
sort.Sort(addresses)
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
// 从后往前,也就是心跳越新的就越会被删除。
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}
针对上述代码,大概步骤如下:
1.剔除一些无效的交易,如Nonce太小,out-of-gas
2.将符合规则的queue队列交易提升到pending,实现函数pool.promoteTx(addr, hash, tx),后续做分析
3.针对queue单个账户的最大限制做交易移除处理
4.针对pending队列的全局限制,以账户包含最小的交易数为标准,删除每个账号多余的交易;如pending的全局显示为128,目前有10账号,账号包含最小交易数为15,经过这轮处理,每个账号的交易数均为15,总交易数为150
5.经过4的处理,如果交易任然超过上限,将以每个账号在pending队列交易数为标准,删除多余交易数
6.queue队列的维护,如果queue队列总量超过系统设置上限,将从交易池删除心跳老的交易
交易池交易升级函数:pool.promoteTx()
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
// 在pending队列没有addr地址的交易记录,新建一个TxList
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
// 该账号交易序号nonce在pending存在并且交易gas优于新产生的交易,删除新交易
delete(pool.all, hash)
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
return
}
if old != nil {
// 该账号交易序号nonce在pending存在并且新产生的交易gas优于旧交易,删除旧交易
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
if pool.all[hash] == nil {
pool.all[hash] = tx
pool.priced.Put(tx)
}
// 更新账号的心跳时间&设置账号的交易Nonce
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
//发送Tx事件
go pool.txFeed.Send(TxPreEvent{tx})
}
**下面对pool.txFeed.Send()发送事件函数进行解析:**
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init) //重新初始化,onece.Do保证只会执行一次
<-f.sendLock //读sendLock通道,若sendLock为空则会堵塞
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...) //将inbox注入到sendCase
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{} //出错了,退出前先写sendLock以免下次send操作堵塞
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
//给所有通道设置要发送的数据
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
cases := f.sendCases
for {
for i := firstSubSendCase; i < len(cases); i++ {
//首先使用TrySend进行发送,这是一种非阻塞操作。当订阅者足够快时一般能够立即成功
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i) //发送成功,后移该通道
i--
}
}
if len(cases) == firstSubSendCase {
break //所有通道发送完成,退出
}
chosen, recv, _ := reflect.Select(cases) //等待通道返回
if chosen == 0 /* <-f.removeSub */ {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{} // 把sendCases中的send都标记为空
}
f.sendLock <- struct{}{} //返回时写入sendLock,为下次发送做准备
return nsent
}
这里需要重点了解以太坊的时间机制Feed,后续有时间将重点介绍
以上是交易池中交易数据的维护过程,如有不正确的地方还望读者能指出!
针对交易池底层数据结构源码分析,推荐一个博文:
https://blog.csdn.net/itcastcpp/article/details/80305213
撸了一下午的代码,该带女友出去转转了,不然要*了。。。
后续我将陆续将以太坊源码的学习笔记在博客上共享。
如果你有对区块链技术感兴趣,并愿意私下一起探讨区块技术,可以扫码加我个人微信,请备注来源【CSDN博客】