go-ethereum中txpool源码学习
TxPool有许多辅助的数据类型,不先介绍一下将会很难理解txpool的实现,这里先介绍与之先关的数据类型
nonceHeap
这是一个uint64数组类型,他实现的是heap接口(container/heap),heap又实现了sort接口,这是一个最小堆。
Push
func (h *nonceHeap) Push(x interface{}) {
*h = append(*h, x.(uint64))
}
就是简单的把一个uint64类型数据放入数组末尾
Pop
func (h *nonceHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
从末尾弹出一个数据
txSortedMap
这是一个nonce到交易的映射
type txSortedMap struct {
items map[uint64]*types.Transaction
index *nonceHeap
cache types.Transactions
}
他有一个map类型成员,键是uint64位数据。即nonce,值就是一个个交易。
func newTxSortedMap() *txSortedMap {
return &txSortedMap{
items: make(map[uint64]*types.Transaction),
index: new(nonceHeap),
}
}
构造方法就是初始化两个成员变量
Get & Put
func (m *txSortedMap) Get(nonce uint64) *types.Transaction {
return m.items[nonce]
}
func (m *txSortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce()
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
}
m.items[nonce], m.cache = tx, nil
}
get方法就是按照nonce取值,put方法就是将交易按期nonce值存储。
Forward
这个方法用来删除存储的所有nonce小于threshold的交易
func (m *txSortedMap) Forward(threshold uint64) types.Transactions {
var removed types.Transactions
for m.index.Len() > 0 && (*m.index)[0] < threshold {
nonce := heap.Pop(m.index).(uint64)
removed = append(removed, m.items[nonce])
delete(m.items, nonce)
}
if m.cache != nil {
m.cache = m.cache[len(removed):]
}
return removed
}
很简单,由于index是一个nonceHeap类型数据,他从小到大排序,所有每次取第一个也就是最小值,进行判断并删除。
Filter
过滤并删除功能
func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions
for nonce, tx := range m.items {
if filter(tx) {
removed = append(removed, tx)
delete(m.items, nonce)
}
}
if len(removed) > 0 {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
}
heap.Init(m.index)
m.cache = nil
}
return removed
}
先遍历所有的item,然后删除所有满足filter函数的交易。注意删除完要重构堆。
Cap
这个方法限制存储的交易数量
func (m *txSortedMap) Cap(threshold int) types.Transactions {
if len(m.items) <= threshold {
return nil
}
var drops types.Transactions
sort.Sort(*m.index)
for size := len(m.items); size > threshold; size-- {
drops = append(drops, m.items[(*m.index)[size-1]])
delete(m.items, (*m.index)[size-1])
}
*m.index = (*m.index)[:threshold]
heap.Init(m.index)
if m.cache != nil {
m.cache = m.cache[:len(m.cache)-len(drops)]
}
return drops
}
如果已有的交易数量大于指定值,则先对index排序,然后将index从后向前删除,知道满足条件,最后还要重构堆。
Remove
func (m *txSortedMap) Remove(nonce uint64) bool {
_, ok := m.items[nonce]
if !ok {
return false
}
for i := 0; i < m.index.Len(); i++ {
if (*m.index)[i] == nonce {
heap.Remove(m.index, i)
break
}
}
delete(m.items, nonce)
m.cache = nil
return true
}
删除方法,先从items中找到对应的交易,然后先从堆中删除给定的nonce,之后再从itmes中删除
Ready
返回一组连续交易,并将其从中删除,这一组交易中第一个交易的nonce要小于给定的值。
func (m *txSortedMap) Ready(start uint64) types.Transactions {
if m.index.Len() == 0 || (*m.index)[0] > start {
return nil
}
var ready types.Transactions
for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ {
ready = append(ready, m.items[next])
delete(m.items, next)
heap.Pop(m.index)
}
m.cache = nil
return ready
}
由于index是一个最小堆,所以根节点也就是数组的第一个值时堆的最小值,如果这个最小值比我们给定的值要大则直接返回。接下来开始一个循环,从堆得最小值开始,寻找连续的交易,每找到一个就将其从items中删除,直到下一个值不连续为止。
Flatten
这个方法是返回一个基于nonce排序的交易集合
func (m *txSortedMap) Flatten() types.Transactions {
if m.cache == nil {
m.cache = make(types.Transactions, 0, len(m.items))
for _, tx := range m.items {
m.cache = append(m.cache, tx)
}
sort.Sort(types.TxByNonce(m.cache))
}
txs := make(types.Transactions, len(m.cache))
copy(txs, m.cache)
return txs
}
首先从cache中查找,没有的话将所有交易放入cache中,并按Nonce排序,最后返回一个Transactions类型数据。TxByNonce是一个可排序的Transactions类型。
txList
txList 是属于同一个账号的交易列表,按照nonce排序。
type txList struct {
strict bool
txs *txSortedMap
costcap *big.Int
gascap uint64
}
可见其中封装了txSortedMap,实际存储还是在txSortedMap中。
func newTxList(strict bool) *txList {
return &txList{
strict: strict,
txs: newTxSortedMap(),
costcap: new(big.Int),
}
}
strict是规定nonce是由要求连续的。
Overlaps
func (l *txList) Overlaps(tx *types.Transaction) bool {
return l.txs.Get(tx.Nonce()) != nil
}
检查给定的交易是否在txList中,实际上是借助txSortedMap的get方法按nonce查询。
Add
func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
old := l.txs.Get(tx.Nonce())
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
}
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
l.costcap = cost
}
if gas := tx.Gas(); l.gascap < gas {
l.gascap = gas
}
return true, old
}
这是添加或替换方法。首先根据交易的nonce从已存储的集合中尝试获取,如果有的话表示有旧的交易。此时安装下面公式计算:(old_price * (100+priceBump))/100.只有如果旧的交易的GasPrice大于新的交易,或者刚才那个值大于新的交易的GasPrice则不进行任何处理。否则将新的交易放入txlist中。接着按情况更新costcap,他代表所有交易中的最高花费,以及gascap,表示所有交易中的最高gas用量。
Forward
func (l *txList) Forward(threshold uint64) types.Transactions {
return l.txs.Forward(threshold)
}
直接调用txSortedMap方法,删除并返回比指定值小的交易
Filter
func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) {
if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit {
return nil, nil
}
l.costcap = new(big.Int).Set(costLimit)
l.gascap = gasLimit
removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit })
var invalids types.Transactions
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
if nonce := tx.Nonce(); lowest > nonce {
lowest = nonce
}
}
invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest })
}
return removed, invalids
}
给定两个参数一个是花费限制,一个是gas限制,所有大于这两个值其中之一的交易都会被删除并返回。
第一步先判断我们已有的交易中是否有符合条件的交易。有的话先更新costcap和gascap。之后利用txSortedMap的filter进行过滤。如果实在严格模式下,先获取刚才过滤出的交易集合中改的最小nonce,返回在获取大于次nonce的所有交易。主要是为了保持连续性
Remove
func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) {
nonce := tx.Nonce()
if removed := l.txs.Remove(nonce); !removed {
return false, nil
}
if l.strict {
return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce })
}
return true, nil
}
删除一个交易,但是在严格模式下,还要过滤出所有nonce大于给定交易的nonce的所有交易。主要是为了保持连续性
txLookup
type txLookup struct {
all map[common.Hash]*types.Transaction
lock sync.RWMutex
}
func newTxLookup() *txLookup {
return &txLookup{
all: make(map[common.Hash]*types.Transaction),
}
}
只是封装了一个map,键是交易的hash,值就是交易本身。另外还有一个读写锁
Range
func (t *txLookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
t.lock.RLock()
defer t.lock.RUnlock()
for key, value := range t.all {
if !f(key, value) {
break
}
}
}
需要指定一个方法,会用这个方法处理txLookup例那个map中的所有键值对,当返回false时终止。
Get & Add
func (t *txLookup) Get(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
return t.all[hash]
}
func (t *txLookup) Add(tx *types.Transaction) {
t.lock.Lock()
defer t.lock.Unlock()
t.all[tx.Hash()] = tx
}
func (t *txLookup) Remove(hash common.Hash) {
t.lock.Lock()
defer t.lock.Unlock()
delete(t.all, hash)
}
func (t *txLookup) Count() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.all)
}
都是直接调用map实现的增删查方法。
accountSet
type accountSet struct {
accounts map[common.Address]struct{}
signer types.Signer
cache *[]common.Address
}
func newAccountSet(signer types.Signer) *accountSet {
return &accountSet{
accounts: make(map[common.Address]struct{}),
signer: signer,
}
}
简单来说就是一个账号的集合,由于封装了Singer对象,所以还可以用来处理签名,Signer是一个接口,提供了从交易中获取发送人地址,获取交易hash以及交易原始签名值的功能
contains
func (as *accountSet) contains(addr common.Address) bool {
_, exist := as.accounts[addr]
return exist
}
判断给定的地址是否在集合内
containsTx
func (as *accountSet) containsTx(tx *types.Transaction) bool {
if addr, err := types.Sender(as.signer, tx); err == nil {
return as.contains(addr)
}
return false
}
给定一个交易,先用Sender方法获取到发送人地址,在判断该地址是否在集合内
add
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
as.cache = nil
}
给定一个地址放入集合内
flatten
func (as *accountSet) flatten() []common.Address {
if as.cache == nil {
accounts := make([]common.Address, 0, len(as.accounts))
for account := range as.accounts {
accounts = append(accounts, account)
}
as.cache = &accounts
}
return *as.cache
}
以数组的形式返回存储的所有地址,先从cache中取,没有值的话在遍历accounts。
priceHeap
首先实际上他是一个Transaction类型的数组,但是实现了堆接口,也是一个最小堆,按照gas价格排序,若gas价格一样则按nonce排序
type priceHeap []*types.Transaction
func (h priceHeap) Len() int { return len(h) }
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h priceHeap) Less(i, j int) bool {
switch h[i].GasPrice().Cmp(h[j].GasPrice()) {
case -1:
return true
case 1:
return false
}
return h[i].Nonce() > h[j].Nonce()
}
func (h *priceHeap) Push(x interface{}) {
*h = append(*h, x.(*types.Transaction))
}
func (h *priceHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
txPricedList
type txPricedList struct {
all *txLookup
items *priceHeap
stales int
}
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
}
}
类似于txList的功能,构造时会传入txloopup对象,其中包含一系列的交易
Put
func (l *txPricedList) Put(tx *types.Transaction) {
heap.Push(l.items, tx)
}
注意这里是将交易放在那个最小堆中
Cap
func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, 128)
save := make(types.Transactions, 0, 64)
for len(*l.items) > 0 {
tx := heap.Pop(l.items).(*types.Transaction)
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
}
if tx.GasPrice().Cmp(threshold) >= 0 {
save = append(save, tx)
break
}
if local.containsTx(tx) {
save = append(save, tx)
} else {
drop = append(drop, tx)
}
}
for _, tx := range save {
heap.Push(l.items, tx)
}
return drop
}
给定两个参数,第一个参数是一个阈值,第二个是一个accountSet集合local。这里会遍历所有item中存储的交易,对于gasprice小于阈值的交易如果不属于local,则放入drop中返回,否则还放回items中。
Underpriced
func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool {
if local.containsTx(tx) {
return false
}
for len(*l.items) > 0 {
head := []*types.Transaction(*l.items)[0]
if l.all.Get(head.Hash()) == nil {
l.stales--
heap.Pop(l.items)
continue
}
break
}
if len(*l.items) == 0 {
log.Error("Pricing query for empty pool")
return false
}
cheapest := []*types.Transaction(*l.items)[0]
return cheapest.GasPrice().Cmp(tx.GasPrice()) >= 0
}
这个方法是给定一个交易,然后检查这个交易的GasPrice是否比集合中存储的GasPrice的最小值还要小,由于是按GasPrice排序,所以items中第一个元素就是GasPrice最小值。
Discard
func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions {
drop := make(types.Transactions, 0, count)
save := make(types.Transactions, 0, 64)
for len(*l.items) > 0 && count > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
}
// Non stale transaction found, discard unless local
if local.containsTx(tx) {
save = append(save, tx)
} else {
drop = append(drop, tx)
count--
}
}
for _, tx := range save {
heap.Push(l.items, tx)
}
return drop
}
这个是取集合中GasPrice最小的count个交易从中删除并返回,前提是不在local中,若在则需要再放回
Removed
func (l *txPricedList) Removed() {
l.stales++
if l.stales <= len(*l.items)/4 {
return
}
reheap := make(priceHeap, 0, l.all.Count())
l.stales, l.items = 0, &reheap
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx)
return true
})
heap.Init(l.items)
}
用来通知有旧的交易被删除,该方法每调用一次,stales加一,当达到items个数的四分之一时,从新调整堆,集体方法是新建一个堆,让后将原来items中改的元素依次放入然后初始化该堆即可。
txJournal
这是一个交易的循环日志对象,目的是存储本地创建的交易,使未执行的交易能在节点启动时得到执行
type txJournal struct {
path string
writer io.WriteCloser
}
func newTxJournal(path string) *txJournal {
return &txJournal{
path: path,
}
}
path就是存储交易的目录
load
func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
}
input, err := os.Open(journal.path)
if err != nil {
return err
}
defer input.Close()
journal.writer = new(devNull)
defer func() { journal.writer = nil }()
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}
var (
failure error
batch types.Transactions
)
for {
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {
loadBatch(batch)
}
break
}
total++
if batch = append(batch, tx); batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0]
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)
return failure
}
加载并解析磁盘存储的交易。首先判断路径所指的文件是否存在,然后打开该文件。devNull是一个虚拟写的对象,它的write不产生任何写动作,直接返回要写数据的长度,它的close永远返回nil。
打开文件后,根据input获取一个rlp的流,之后进入一个循环,每次解码出一个交易,如果没错的话,计数器total加一,然后将刚才解码出的交易放入batch中,这是一个交易集合。若batch中交易数量大于1024或者刚才解码出错都调用loadBatch方法。
loadBatch方法线用add方法处理刚才得到了交易集合,add方法是我们调用load方法是指定的,之后遍历add返回的error数组,对于有错的交易打印log并使计数器dropped加一。
insert
func (journal *txJournal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {
return err
}
return nil
}
这里是将一个交易放入日志中,直接使用rlp编码,接入写入writer中。
rotate
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
if err != nil {
return err
}
journal.writer = sink
log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))
return nil
}
这个是用来重新生成交易日志的。第一步先把原来的writer关闭,之后新建一个日志文件,在原来path基础上加.new后缀,然后遍历所有给定的交易,参数中给的是一个map,指的是对应地址的所有交易。对于每个交易先经过rlp编码后写入文件中。最后将新建的文件重命名为原来给定的名字。最后重新打开新的文件,然后设置writer为新文件对象。
txSenderCacher
这是一个帮助了,为的是并发的从一组交易中恢复出每个交易的发送人地址
var senderCacher = newTxSenderCacher(runtime.NumCPU())
type txSenderCacher struct {
threads int
tasks chan *txSenderCacherRequest
}
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
}
func newTxSenderCacher(threads int) *txSenderCacher {
cacher := &txSenderCacher{
tasks: make(chan *txSenderCacherRequest, threads),
threads: threads,
}
for i := 0; i < threads; i++ {
go cacher.cache()
}
return cacher
}
构造参数需要指定并发数,一般都是cpu核数。然后启动若干个goroutine去执行cache方法
cache
func (cacher *txSenderCacher) cache() {
for task := range cacher.tasks {
for i := 0; i < len(task.txs); i += task.inc {
types.Sender(task.signer, task.txs[i])
}
}
}
tasks是一个channel,其中数据时txSenderCacherRequest,在cache中会在for循环中阻塞,知道通道中有数据来临,然后调用Sender方法去恢复地址,并将地址写入对应交易的from字段
recover
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
if len(txs) == 0 {
return
}
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
}
}
}
我们说刚才cache方法在没有数据时会被阻塞,这个方法就是用来发送数据的,现将给定的交易集合分为若干份,每一份都封装为一个txSenderCacherRequest对象,然后发送到通道中,这样激活了cache去解析地址。
recoverFromBlocks
func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*types.Block) {
count := 0
for _, block := range blocks {
count += len(block.Transactions())
}
txs := make([]*types.Transaction, 0, count)
for _, block := range blocks {
txs = append(txs, block.Transactions()...)
}
cacher.recover(signer, txs)
}
我们知道一个区块中也保存了大量交易,所以这里也封装了一个方法去直接处理一个区块中的所有交易,主要是将区块中的交易封装为一个Transactions然后用recover方法处理。
TxPool
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB
pendingState *state.ManagedState
currentMaxGas uint64
locals *accountSet
journal *txJournal
pending map[common.Address]*txList
queue map[common.Address]*txList
beats map[common.Address]time.Time
all *txLookup
priced *txPricedList
wg sync.WaitGroup
homestead bool
}
这里面有几个重要的成员,后面的方法大多都是围绕这几个成员操作的。pending指所有当前可以处理的交易,按照发送人地址归类。queue指当前还不能处理的交易,也是按发送人地址归类。beats指每一个相关账户最后一次心跳时间。all指所有可以查找到的交易。priced是将交易按GasPrice排序的集合。gasPrice指最小gas价格。currentMaxGas指当前最大gas使用量。默认的txpool配置如下
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
构造
txpool包含所有当前已知交易。当从网络收到交易或本地提交交易后,交易会被放入txpool中,当交易已经被包含在区块链中时,会被删除。
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
config = (&config).sanitize()
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainID),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newTxPricedList(pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()
return pool
}
第一步调用的TxPoolConfig的sanitize方法主要是检查配置是否合法。剩余的都是在给txpool的成员赋值,新建了许多我们刚才介绍过的数据类型,其中注册了ChainHead事件。还有就是初始化了Locals,他从config中加载一些账户,这些账户的交易被标记为本地交易,以免在后面的操作中被移除。在构造方法中还调用了rest方法,最后还在一个单独的goroutine中执行了loop方法
reset
reset方法检索区块链的当前状态并且确保交易池的内容关于当前的区块链状态是有效的。
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
oldNum := oldHead.Number.Uint64()
newNum := newHead.Number.Uint64()
if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
log.Debug("Skipping deep transaction reorg", "depth", depth)
} else {
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
}
if newHead == nil {
newHead = pool.chain.CurrentBlock().Header()
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
log.Debug("Reinjecting stale transactions", "count", len(reinject))
senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
pool.demoteUnexecutables()
for addr, list := range pool.pending {
txs := list.Flatten()
pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
}
pool.promoteExecutables(nil)
}
参数中传入两个区块头,表示一新一旧两个区块。首先如果旧区块不是新区快的父区块的话,需要重构。但是如果二者高度相差太多则不执行重构,这里的界线是64。如果需要重构,首先要找到二者的公共祖先节点,逻辑和BlockChain插入区块时主要需要调整的逻辑类似。两个区块在回溯时,经过的区块中包含交易都分别保存在discarded和included中,discarded表示要删除的交易,included表示要添加的交易。最后用TxDifference方法寻找二者区别,也就是discarded有的而included没有的。
接着获取新区快的状态树,并设置pool的currentState、pendingState和currentMaxGas。接下来调用recover方法将要添加的交易恢复出发送人地址。
再往下调用了addTxsLocked方法,这里有效交易放到相应队列中。后面的demoteUnexecutables方法将pending中一些不能执行的交易放入queue中。之后有一个循环更新交易账号的nonce,最后调用promoteExecutables方法见将一些可以执行的交易放到pending中。
之所以调用demoteUnexecutables方法是因为currentMaxGas等信息的改变,有一些交易将变得无法执行。调用promoteExecutables是因为账户的nonce发送改变,有一些交易变得可以执行
loop
func (pool *TxPool) loop() {
defer pool.wg.Done()
var prevPending, prevQueued, prevStales int
report := time.NewTicker(statsReportInterval)
defer report.Stop()
evict := time.NewTicker(evictionInterval)
defer evict.Stop()
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()
head := pool.chain.CurrentBlock()
for {
select {
case ev := <-pool.chainHeadCh:
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block
pool.mu.Unlock()
}
case <-pool.chainHeadSub.Err():
return
case <-report.C:
pool.mu.RLock()
pending, queued := pool.stats()
stales := pool.priced.stales
pool.mu.RUnlock()
if pending != prevPending || queued != prevQueued || stales != prevStales {
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
prevPending, prevQueued, prevStales = pending, queued, stales
}
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
for _, tx := range pool.queue[addr].Flatten() {
pool.removeTx(tx.Hash(), true)
}
}
}
pool.mu.Unlock()
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
这就是txloop的事件循环。开始设置了三个定时器,report每8秒触发一次,evict每分钟触发一次,journal可以自己设置,最低每秒触发一次,默认是1小时。
journal每次触发时会调用local方法获取pending和queue中的所有键值对,然后生成新的日志文件。
report触发时就是打印日志。
evict触发时会遍历queue中的超时交易并将其删除,默认超时时间是3小时。
chainHeadCh触发时表示收到新区快,此时调用reset方法。
addTxsLocked
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
dirty := make(map[common.Address]struct{})
errs := make([]error, len(txs))
for i, tx := range txs {
var replace bool
if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace {
from, _ := types.Sender(pool.signer, tx)
dirty[from] = struct{}{}
}
}
if len(dirty) > 0 {
addrs := make([]common.Address, 0, len(dirty))
for addr := range dirty {
addrs = append(addrs, addr)
}
pool.promoteExecutables(addrs)
}
return errs
}
这个方法尝试将交易放到相应队列中。
这里给定一组交易,首先遍历这些交易,对于每个交易执行add方法将其添加到对应队列。replace表示是否替换了旧的交易,没有的话还原出交易的发送者,然后在dirty中添加值。处理完所有交易后如果dirty长度不为零,则读出其中存储的所有交易发送人地址,并执行promoteExecutables方法。
demoteUnexecutables
func (pool *TxPool) demoteUnexecutables() {
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
log.Trace("Removed old pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
for _, tx := range invalids {
hash := tx.Hash()
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
}
if list.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
}
}
这个方法是从pending中删除无效交易
这里首先遍历pending这个map,取出每个地址当前的nonce,然后从对应的list中移除所有nonce小于当前值的交易,并将这些交易移除all和priced。之后在滤除所有花费大于当前地址余额或者、gaslimite大于currentMaxGas的交易。Filter返回两部分,drops是符合条件被滤除的,接下来这些交易需要被删除。invalids是由于要保持连续性被滤除的这些交易需要通过enqueueTx方法进入queue中。
再往下,经过刚才操作后,如果pending中账户对应的list中还有值,但是取不到对应nonce的交易,表示剩下的交易都是nonce大于账户当前nonce的交易,这些交易也需要被拿出来放入queue中。此时如果对应的list为空这删除该地址的相关信息。
promoteExecutables
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
var promoted []*types.Transaction
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
accounts = append(accounts, addr)
}
}
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue
}
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
}
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
pool.all.Remove(hash)
pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
log.Trace("Promoting queued transaction", "hash", hash)
promoted = append(promoted, tx)
}
}
if !pool.locals.contains(addr) {
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
queuedRateLimitCounter.Inc(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
if list.Empty() {
delete(pool.queue, addr)
}
}
if len(promoted) > 0 {
go pool.txFeed.Send(NewTxsEvent{promoted})
}
pending := uint64(0)
for _, list := range pool.pending {
pending += uint64(list.Len())
}
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
spammers := prque.New(nil)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
}
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
if len(offenders) > 1 {
threshold := pool.pending[offender.(common.Address)].Len()
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
for i := 0; i < len(offenders)-1; i++ {
list := pool.pending[offenders[i]]
for _, tx := range list.Cap(list.Len() - 1) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce {
pool.pendingState.SetNonce(offenders[i], nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
}
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
for _, addr := range offenders {
list := pool.pending[addr]
for _, tx := range list.Cap(list.Len() - 1) {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pending--
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
}
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued > pool.config.GlobalQueue {
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) {
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
}
sort.Sort(addresses)
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
continue
}
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
drop--
queuedRateLimitCounter.Inc(1)
}
}
}
}
这里是将queue中可执行的交易放到pending中。
参数中给定一组地址,表示其所对应的交易需要修改状态。如果参数为空,则默认为queue中所有地址。
之后遍历所有账户,取其在queue中对应的list,首先移除list中交易的nonce低于账户当前nonce的,将其从all和priced中移除。之后再滤除所有交易花费大于账户余额的或者gas用量大于当前gas限制的所有交易,也将其从all和priced中移除。之后利用ready方法得到可执行的交易,ready方法需要指定一个nonce,如果存储的最小值小于这个nonce,这从起开始取一系列连续的交易。由于之前已经删除了nonce小于当前账户值的交易,所以这里是从nonce等于账户当前值的交易开始取,取出的一些列交易nonce是递增的。对于这些交易使用promoteTx方法将其放入pending中,若放入成功,则在promoted中存一份。
之后如果该放手地址不在locals中,限制list数量为最大64(默认值)。经过上面处理后如果queue中对应地址的list为空,则将其从queue中删除。
循环完所有给定的地址后,如果有交易需要放到pending中时,发送消息。
下面是处理pending溢出的问题。首先添加pending中给所有交易处理。如果大于给定值(默认4096)则需要进一步处理。首先新建一个优先级队列spammers。然后遍历pending中所有键值对,如果不属于locals而且其对应list长度大于给定值(默认16),则将其地址放入spammers,优先级就是list长度。
接着开启一个循环,直到pending中交易数量满足要求或者spammers为空为止。每次循环都从spammers获取优先级最高的地址,放入offenders,直到offenders数量大于1时,循环删除数组中每个地址的list的最后一个交易,前提是保证各个地址的优先级顺序。如果还超过指定数量,则继续循环删除,要保证list长度不小于16(默认值).
接着处理queue溢出问题,也是先统计数量,默认的最大值是1024。将queue中所有不属于locals的地址放入addresses中,这是一个addressesByHeartbeat数组,可以按心跳时间排序。接着按心跳时间从就到新依次删除,直到总数满足要求。
add
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
hash := tx.Hash()
if pool.all.Get(hash) != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
if err := pool.validateTx(tx, local); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
return false, err
}
if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
if !local && pool.priced.Underpriced(tx, pool.locals) {
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
return false, ErrUnderpriced
}
drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
underpricedTxCounter.Inc(1)
pool.removeTx(tx.Hash(), false)
}
}
from, _ := types.Sender(pool.signer, tx)
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pendingDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
pool.all.Add(tx)
pool.priced.Put(tx)
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
return old != nil, nil
}
replace, err := pool.enqueueTx(hash, tx)
if err != nil {
return false, err
}
if local {
if !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
}
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replace, nil
}
这里主要是验证交易并根据情况插入到相应队列。
首先给定一个交易,首先判断是否在all中,是的话表示已经知道该交易直接返回。之后调用validateTx验证交易是否有效。有效的话,看all中的交易数量是否满了,满的话,检查要插入的交易是否比priced中所有交易的GasPrice都要小,是的话抛弃新交易,否则,抛弃priced中一定数量的交易,对于要抛弃的交易调用removeTx方法将其从一些集合中删除。
如果all没有满,或已满但处理过之后,从交易中还原出发送人地址。接着从pending中取出对应地址的交易集合,如果集合不为空且有相同nonce的交易,则尝试替换,如果不需要替换则直接返回,若需要替换,则将旧的交易从all和priced中删除。并将新的交易放入all和priced中。
若该交易的发送人在pending中没有交易列表或列表内没旧的交易,则将调用enqueueTx将其放入queue中。若是本地模式则检查locals是否包含该地址,若没有则加入进去。之后将交易放入日志中,最后返回。
validateTx
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if tx.Size() > 32*1024 {
return ErrOversizedData
}
if tx.Value().Sign() < 0 {
return ErrNegativeValue
}
if pool.currentMaxGas < tx.Gas() {
return ErrGasLimit
}
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
}
if tx.Gas() < intrGas {
return ErrIntrinsicGas
}
return nil
}
首先判断交易的大小,所谓交易的大小是指交易数据(如发送人、接收人、GasPrice、value等数据)经rlp编码后的大小,最大不得超过32KB。
之后传输的金额不得小于0,并且交易的GasLimit不得超过当前txpool设置的值(在reset中根据区块设置),也就是区块的限制。接着恢复出交易的发送人地址,这里不能有错。然后对于非本地交易,其gasprice不得小于txpool设定的和。另外还要验证发送者的nonce和交易中标记的nonce是否符合要求,并且发送者还要有足够的余额。最后计算出消耗的固定gas,不能超过gaslimit。
removeTx
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
tx := pool.all.Get(hash)
if tx == nil {
return
}
addr, _ := types.Sender(pool.signer, tx)
pool.all.Remove(hash)
if outofbound {
pool.priced.Removed()
}
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
if pending.Empty() {
delete(pool.pending, addr)
delete(pool.beats, addr)
}
for _, tx := range invalids {
pool.enqueueTx(tx.Hash(), tx)
}
if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
pool.pendingState.SetNonce(addr, nonce)
}
return
}
}
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
}
}
}
这是删除一个交易的操作。首先从all中获取对应交易,没有的话直接返回。有的话从交易中恢复出地址,然后从all中删除交易,如果第二个参数为true的话,priced中的也要对应删除。之后获取pending中对应地址的txlist,将交易从中删除,若删除之后对应txlist为空则还要删除对应的txlist。txlist的remove的第二个方法是在严格模式下删除的那些不连续的交易,对于这些交易要通过enqueueTx放入未执行队列中。最后如果发送地址的nonce大于交易的nonce,要修改发送地址的nonce。
最后删除queue中对应地址下的txlist中的相应交易。
enqueueTx
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
from, _ := types.Sender(pool.signer, tx)
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
queuedDiscardCounter.Inc(1)
return false, ErrReplaceUnderpriced
}
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
return old != nil, nil
}
这是将一个交易插入queue中,首先获取交易的发送人地址,之后如果queue中对应地址的值为空,则新建一个txlist(非严格模式)。之后调用add方法插入,add方法会返回有相同nonce的旧交易,若有旧交易则需要将其从all删除。对于新交易如果all中没有则添加进去,同样也要在priced中添加一份。
journalTx
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}
这个操作主要是讲交易放入日志中,但事先会检查地址是否在locals中
promoteTx
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) bool {
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
inserted, old := list.Add(tx, pool.config.PriceBump)
if !inserted {
pool.all.Remove(hash)
pool.priced.Removed()
pendingDiscardCounter.Inc(1)
return false
}
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
pool.priced.Put(tx)
}
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
return true
}
这是将某个交易放入pending中
首先将交易放入pending中对应的list中,如果没有出现替换,则将新的交易从all中移除,若出现替换则将旧的交易从all中移除。最后还要讲新的交易放入all和priced中,并记录改地址心跳时间,并更新地址的nonce。
题图来自unsplash:https://unsplash.com/photos/eWFdaPRFjwE