go-ethereum中core-blockchain源码学习
数据结构
在BlockChain的数据结构定义前面有一段注释,解释了BlockChain功能,大致意思是BlockChain提供了规范链,也可以理解为主链的定义,同时提供了一系列区块链的操作。数据结构如下:
// go-ethereum\core\blockchain.go
type BlockChain struct {
chainConfig *params.ChainConfig // Chain & network configuration
cacheConfig *CacheConfig // Cache configuration for pruning
db ethdb.Database // Low level persistent database to store final content in
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock
checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
stateCache state.Database // State database to reuse between imports (contains state cache)
bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
receiptsCache *lru.Cache // Cache for the most recent receipts per block
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup // chain processing wait group for shutting down
engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
}
NewBlockChain
区块链的初始化方法是NewBlockChain,他在eth\backend.go中的New方法里会被调用。也就是创建一个Ethereum时会创建BlockChain对象并赋值到Ethereum的blockchain中。实现如下:
// go-ethereum\core\blockchain.go
func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config, shouldPreserve func(block *types.Block) bool) (*BlockChain, error) {
if cacheConfig == nil {
cacheConfig = &CacheConfig{
TrieCleanLimit: 256,
TrieDirtyLimit: 256,
TrieTimeLimit: 5 * time.Minute,
}
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
receiptsCache, _ := lru.New(receiptsCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)
bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
quit: make(chan struct{}),
shouldPreserve: shouldPreserve,
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
receiptsCache: receiptsCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
var err error
bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
if err != nil {
return nil, err
}
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
return nil, ErrNoGenesis
}
if err := bc.loadLastState(); err != nil {
return nil, err
}
for hash := range BadHashes {
if header := bc.GetHeaderByHash(hash); header != nil {
headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
log.Error("Found bad hash, rewinding chain", "number", header.Number, "hash", header.ParentHash)
bc.SetHead(header.Number.Uint64() - 1)
log.Error("Chain rewind was successful, resuming normal operation")
}
}
}
go bc.update()
return bc, nil
}
首先检查了cacheConfig是否为空,若为空则初始化一个默认配置。随后初始化了一系列的lru缓存实例。然后构造了BlockChain的实例,其中triegc是一个prque,也就是优先级队列的数据结构;stateCache在前面介绍state是分析过,NewDatabaseWithCache返回一个database类型对象,用于后面构建StateDB。
随后设置了Validator和Processor。Validator是区块链验证器,Processor是用来对交易进行处理。
接下来调用了NewHeaderChain创建了一个HeaderChain对象赋值给bc的hc。HeaderChain是一个只包含区块头的链。然后去取编号为0的区块,也就是创世区块并赋值给genesisBlock字段,如果取不到则报错。接下来调用loadLastState去加载最新状态,最后遍历BadHashes,所谓的BadHashes就是一组散列值,用于检测硬分叉,首先根据hash检测我们的区块链上是否有对应的区块,如果有的话在检测对应编号的区块是否相同,相同的话表示有坏区块,这时需要回滚到坏区块的编号减一的位置。
最后启动一个goroutine运行update,然后返回构建的区块链对象。
update
这个方法实现如下
func (bc *BlockChain) update() {
futureTimer := time.NewTicker(5 * time.Second)
defer futureTimer.Stop()
for {
select {
case <-futureTimer.C:
bc.procFutureBlocks()
case <-bc.quit:
return
}
}
}
主要就是启动一个ticker,每5秒触发一次,去执行procFutureBlocks方法
func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block.(*types.Block))
}
}
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks)
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
}
}
}
这个方法主要是遍历futureBlocks这个lru缓存的key,futureBlocks存储着将要插入的区块,然后取出对应的block,并将block插入区块链。
loadLastState
刚才NewBlockChain方法中的loadLastState是用来更新区块链到最新状态,主要是更新三个变量:currentBlock,currentHeader和currentFastBlock。currentBlock表示区块链中最新的区块;currentHeader表示那个只保存区块头的链的最新内容,这个头部的编号可能大于currentBlock;currentFastBlock表示快速同步模式下的最新内容,也可能大于currentBlock。实现如下
func (bc *BlockChain) loadLastState() error {
head := rawdb.ReadHeadBlockHash(bc.db)
if head == (common.Hash{}) {
log.Warn("Empty database, resetting chain")
return bc.Reset()
}
currentBlock := bc.GetBlockByHash(head)
if currentBlock == nil {
log.Warn("Head block missing, resetting chain", "hash", head)
return bc.Reset()
}
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
if err := bc.repair(¤tBlock); err != nil {
return err
}
}
bc.currentBlock.Store(currentBlock)
currentHeader := currentBlock.Header()
if head := rawdb.ReadHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
currentHeader = header
}
}
bc.hc.SetCurrentHeader(currentHeader)
bc.currentFastBlock.Store(currentBlock)
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
}
}
currentFastBlock := bc.CurrentFastBlock()
headerTd := bc.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
blockTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
fastTd := bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64())
log.Info("Loaded most recent local header", "number", currentHeader.Number, "hash", currentHeader.Hash(), "td", headerTd, "age", common.PrettyAge(time.Unix(currentHeader.Time.Int64(), 0)))
log.Info("Loaded most recent local full block", "number", currentBlock.Number(), "hash", currentBlock.Hash(), "td", blockTd, "age", common.PrettyAge(time.Unix(currentBlock.Time().Int64(), 0)))
log.Info("Loaded most recent local fast block", "number", currentFastBlock.Number(), "hash", currentFastBlock.Hash(), "td", fastTd, "age", common.PrettyAge(time.Unix(currentFastBlock.Time().Int64(), 0)))
return nil
}
在这个方法中首先利用ReadHeadBlockHash读取了数据库中的最新区块hash,也就是数据库key为”LastBlock”的数据,他代表区块链的最高区块的hash值,若为空则表示数据库为空,对应的区块链也要重置,调用Reset。
若不为空,则根据hash值去查找对应区块数据,若找不到也进行重置。若找到对应区块,则确认和这个区块相关的状态树是否正确。这里的主要操作是尝试访问状态树,看是否成功,若不成功尝试进行修复,使用repair。若一切都正确则更新currentBlock。
继续往下是更新currentHeader,逻辑也是一样的,先从数据库中读取key为”LastHeader”的数据,他表示只含头区块的链的最高区块头hash,然后根据hash从数据库查找对应头区块并更新currentHeader,若找不到则以前面的currentBlock的头作为currentHeader。
再往下还是类似的逻辑,这次更新currentFastBlock,也是先从数据库中读取key为”LastFast”的数据,然后寻找对应的区块,并更新currentFastBlock,若找不到就以currentHeader为currentFastBlock。
最后进行了log打印。
reset
前面在loadLastState多次出现了reset方法,主要是恢复到创世区块状态
func (bc *BlockChain) Reset() error {
return bc.ResetWithGenesisBlock(bc.genesisBlock)
}
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.SetHead(0); err != nil {
return err
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
}
rawdb.WriteBlock(bc.db, genesis)
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock.Store(bc.genesisBlock)
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
return nil
}
这里首先调用SetHead方法,设置头区块为0。SetHead方法主要是指定区块链的某个区块为新的头区块,该区块之后的区块都会被删除,主要用于回滚。这里我们设置为0,也就是清空整个区块链。
接下来根据创世区块重构区块链。首先利用hc上写入总难度,然后向数据库写入创世区块,并将其插入到区块链中,并设置currentBlock、currentFastBlock和CurrentHeader。
SetHead
刚才reset以及NewBlockChain都用到了SetHead方法,这里详细看一下
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
rawdb.DeleteBody(db, hash, num)
}
bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader()
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.receiptsCache.Purge()
bc.blockCache.Purge()
bc.futureBlocks.Purge()
if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() {
bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock != nil {
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
bc.currentBlock.Store(bc.genesisBlock)
}
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() {
bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()))
}
if currentBlock := bc.CurrentBlock(); currentBlock == nil {
bc.currentBlock.Store(bc.genesisBlock)
}
if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil {
bc.currentFastBlock.Store(bc.genesisBlock)
}
currentBlock := bc.CurrentBlock()
currentFastBlock := bc.CurrentFastBlock()
rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash())
rawdb.WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash())
return bc.loadLastState()
}
首先调用headerchain的SetHead方法:
func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) {
height := uint64(0)
if hdr := hc.CurrentHeader(); hdr != nil {
height = hdr.Number.Uint64()
}
batch := hc.chainDb.NewBatch()
for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() {
hash := hdr.Hash()
num := hdr.Number.Uint64()
if delFn != nil {
delFn(batch, hash, num)
}
rawdb.DeleteHeader(batch, hash, num)
rawdb.DeleteTd(batch, hash, num)
hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1))
}
for i := height; i > head; i-- {
rawdb.DeleteCanonicalHash(batch, i)
}
batch.Write()
hc.headerCache.Purge()
hc.tdCache.Purge()
hc.numberCache.Purge()
if hc.CurrentHeader() == nil {
hc.currentHeader.Store(hc.genesisHeader)
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
rawdb.WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash)
}
在headerchain的SetHead中,首先计算了当前链的高度,然后开启一个循环从链的头部开始遍历到目标区块,每遍历一个区块就执行删除操作,具体来说,是先执行delFn函数,这个函数调用了DeleteBody用来删除区块体,然后又删除了存储在数据库中的区块头和总难度
随后又有一个循环,也是从最高点开始到目标位置,删除规范链的hash。注意这些删除都是借助了批量操作。最后清空几个缓存后,如果最高区块为空,则存入创世区块的头作为最高区块。再存取最高区块的hash,并写入数据库。
回到BlockChain的setHead,在hc回滚完之后,实际上目标点之后的区块已经删除,接下来更新了currentBlock,这里以hc的currentHeader为标准去查找区块,然后验证了状态树是否正确,不正确的话currentBlock设为创世区块。接着又更新了currentFastBlock,也是以currentHeader为标准。下面判断了currentBlock与currentFastBlock是否为空,若为空将二者都设置为创世区块。最后更新了数据库中currentBlock与currentFastBlock,也就是key为”LastBlock”和”LastFast”的数据。最后调用了loadLastState去更新状态。
InsertChain
这个是插入一系列区块的,在前面update的procFutureBlocks方法以及外部的downloader的importBlockResults中有调用过,只不过那里一次只插入一个区块,具体实现如下:
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
if len(chain) == 0 {
return 0, nil
}
for i := 1; i < len(chain); i++ {
if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
"parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
}
}
bc.wg.Add(1)
bc.chainmu.Lock()
n, events, logs, err := bc.insertChain(chain, true)
bc.chainmu.Unlock()
bc.wg.Done()
bc.PostChainEvents(events, logs)
return n, err
}
前面一部分主要是检查要插入的一系列区块是否合法,如编号是否正确,父子关系是否正确等。实际插入方法在insertChain中实现:
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
return 0, nil, nil, nil
}
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
var (
stats = insertStats{startTime: mclock.Now()}
events = make([]interface{}, 0, len(chain))
lastCanon *types.Block
coalescedLogs []*types.Log
)
headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))
for i, block := range chain {
headers[i] = block.Header()
seals[i] = verifySeals
}
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)
it := newInsertIterator(chain, results, bc.Validator())
block, err := it.next()
switch {
case err == consensus.ErrPrunedAncestor:
return bc.insertSidechain(block, it)
case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())):
for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
block, err = it.next()
}
stats.queued += it.processed()
stats.ignored += it.remaining()
return it.index, events, coalescedLogs, err
case err == ErrKnownBlock:
current := bc.CurrentBlock().NumberU64()
for block != nil && err == ErrKnownBlock && current >= block.NumberU64() {
stats.ignored++
block, err = it.next()
}
case err != nil:
stats.ignored += len(it.chain)
bc.reportBlock(block, nil, err)
return it.index, events, coalescedLogs, err
}
for ; block != nil && err == nil; block, err = it.next() {
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
break
}
if BadHashes[block.Hash()] {
bc.reportBlock(block, nil, ErrBlacklistedHash)
return it.index, events, coalescedLogs, ErrBlacklistedHash
}
start := time.Now()
parent := it.previous()
if parent == nil {
parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
}
state, err := state.New(parent.Root(), bc.stateCache)
if err != nil {
return it.index, events, coalescedLogs, err
}
t0 := time.Now()
receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
t1 := time.Now()
if err != nil {
bc.reportBlock(block, receipts, err)
return it.index, events, coalescedLogs, err
}
if err := bc.Validator().ValidateState(block, parent, state, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
return it.index, events, coalescedLogs, err
}
t2 := time.Now()
proctime := time.Since(start)
status, err := bc.writeBlockWithState(block, receipts, state)
t3 := time.Now()
if err != nil {
return it.index, events, coalescedLogs, err
}
blockInsertTimer.UpdateSince(start)
blockExecutionTimer.Update(t1.Sub(t0))
blockValidationTimer.Update(t2.Sub(t1))
blockWriteTimer.Update(t3.Sub(t2))
switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
"uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(),
"elapsed", common.PrettyDuration(time.Since(start)),
"root", block.Root())
coalescedLogs = append(coalescedLogs, logs...)
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
bc.gcproc += proctime
case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(),
"diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)),
"txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()),
"root", block.Root())
events = append(events, ChainSideEvent{block})
}
blockInsertTimer.UpdateSince(start)
stats.processed++
stats.usedGas += usedGas
dirty, _ := bc.stateCache.TrieDB().Size()
stats.report(chain, it.index, dirty)
}
if block != nil && err == consensus.ErrFutureBlock {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
block, err = it.next()
for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() {
if err := bc.addFutureBlock(block); err != nil {
return it.index, events, coalescedLogs, err
}
stats.queued++
}
}
stats.ignored += it.remaining()
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
events = append(events, ChainHeadEvent{lastCanon})
}
return it.index, events, coalescedLogs, err
}
在这个方法里,首先验证了头部的正确性。之后创建了两个数组分别存储要插入的区块头和该区块是否需要验证。然后创建了一个迭代器:
// go-ethereum\core\blockchain_insert.go
func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator {
return &insertIterator{
chain: chain,
results: results,
index: -1,
validator: validator,
}
}
这个迭代器有一组迭代方法,next用于获取下一个区块,previous用于获取上一个区块,first获取第一个区块,remaining返回还有多少区块未便利,processed返回已经遍历了多少区块。它是借助于数组操作的,实现也很简单,唯一特别的是在获取下一个区块时会对区块进行验证。
这里首先取了第一个区块,然后根据不同的错误执行不同的逻辑,若没有错误则进入循环遍历所有要插入的区块。
首先检测每个区块是否是BadHash中元素,若不是,先取出其父区块,对于第一个区块的父区块要在本地区块链中根据其指定的父区块hash查找,其余区块就是通过迭代器的previous方法取上一个区块。接着新建一个状态树,然后利用processor处理交易,生成收据日志等信息。接着又验证了状态树,若通过,则利用writeBlockWithState写入区块和状态。最后根据写入的结果执行不同逻辑:CanonStatTy表示插入了新区快,SideStatTy表示插入了分叉区块。
在循环结束后,判断是否还有区块,这种情况主要是之前循环过程中procInterrupt等于1,也就是被中断了,在调用stop函数后会出现这种情况。若有剩余的,而且err为ErrFutureBlock,表示区块是未来的区块时会将其添加到futureBlocks缓存中,并迭代其后的区块,注意添加的时候最多添加未来30s以内的区块。
writeBlockWithState
insertChain的主要插入操作是在writeBlockWithState完成的,实现如下:
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
return NonStatTy, err
}
rawdb.WriteBlock(bc.db, block)
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
}
triedb := bc.stateCache.TrieDB()
if bc.cacheConfig.Disabled {
if err := triedb.Commit(root, false); err != nil {
return NonStatTy, err
}
} else {
triedb.Reference(root, common.Hash{})
bc.triegc.Push(root, -int64(block.NumberU64()))
if current := block.NumberU64(); current > triesInMemory {
var (
nodes, imgs = triedb.Size()
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
triedb.Cap(limit - ethdb.IdealBatchSize)
}
chosen := current - triesInMemory
if bc.gcproc > bc.cacheConfig.TrieTimeLimit {
header := bc.GetHeaderByNumber(chosen)
if header == nil {
log.Warn("Reorg in progress, trie commit postponed", "number", chosen)
} else {
if chosen < lastWrite+triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
}
triedb.Commit(header.Root, true)
lastWrite = chosen
bc.gcproc = 0
}
}
for !bc.triegc.Empty() {
root, number := bc.triegc.Pop()
if uint64(-number) > chosen {
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
}
}
}
batch := bc.db.NewBatch()
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts)
reorg := externTd.Cmp(localTd) > 0
currentBlock = bc.CurrentBlock()
if !reorg && externTd.Cmp(localTd) == 0 {
if block.NumberU64() < currentBlock.NumberU64() {
reorg = true
} else if block.NumberU64() == currentBlock.NumberU64() {
var currentPreserve, blockPreserve bool
if bc.shouldPreserve != nil {
currentPreserve, blockPreserve = bc.shouldPreserve(currentBlock), bc.shouldPreserve(block)
}
reorg = !currentPreserve && (blockPreserve || mrand.Float64() < 0.5)
}
}
if reorg {
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}
rawdb.WriteTxLookupEntries(batch, block)
rawdb.WritePreimages(batch, state.Preimages())
status = CanonStatTy
} else {
status = SideStatTy
}
if err := batch.Write(); err != nil {
return NonStatTy, err
}
if status == CanonStatTy {
bc.insert(block)
}
bc.futureBlocks.Remove(block.Hash())
return status, nil
}
首先计算了几个区块链的难度值,ptd表示要插入区块的父区块的总难度,localTd表示本地最高区块的总难度,externTd表示要插入区块的难度加上ptd,也就是可能的新的总难度。计算完之后将externTd写入数据库,随后又写入了区块体,然后又提交了state,之前在处理交易时状态可能改变了,这里将其提交持久化到数据库中。再往下写入了收据信息。
接下来需要根据reorg的真假决定后面的逻辑,这其实也对应了区块插入的几种情况。
第一种情况,如果新区块插入后总难度并不比主链的总难度高,该区块插入后不产生实际影响,则reorg置为false,对应的后面status写为SideStatTy,表示区块插入到了分支上了。
第二种情况,如果新区快插入后总难度比我们主链的难度高,但是新区块又不能放在主链后,表示有分叉而且主流也要调整,此时reorg置为true,并且执行reorg方法,最后status写为CanonStatTy,表示规范链也就是主链有变动。
第三种情况,如果新区块插入后总难度和我们目前主链相等,但是新区块的高度比我们目前主链要小。说明出现更优的主链,此时新区快必定不能放在主链后,所以和第二种情况一样,reorg置为true,并且执行reorg方法,最后status写为CanonStatTy,表示主链有变动。
第四种情况,如果新区块插入后总难度和我们目前主链相等,同时高度有相等,这是一种在主链顶端分叉的情况,具体选哪一个为主链,这就要具体判断了,这里要避免私自挖矿的情况。这里调用shouldPreserve方法,这个方法主要是检测区块是不是自己挖的。如果本地最新区块是自己挖的,则reorg置为false,表示不分叉。如果本地最高区块不是自己挖的,新区快是自己挖的,则reorg置为true,表示主链要更改。如果两个都不是自己挖的,则有50%的概率reorg为true,也就是50%概率修改主链。另外,这种情况下,如果reorg为true,必能触发后面的reorg方法重构主链。
在判断reorg的if中,如果新插入区块的父区块不等于当前本地最新区块的,则表示新区快不能放主链后,则表示主链要从新定义,所以执行reorg方法。对于能接在主链后的区块,只是简单调用WriteTxLookupEntries和WritePreimages写入一些信息。
再接着,如果status为CanonStatTy,调用insert来设置新的区块链信息。最后把该区块从futureBlocks中移除。
reorg
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
deletedLogs []*types.Log
collectLogs = func(hash common.Hash) {
// Coalesce logs and set 'Removed'.
number := bc.hc.GetBlockNumber(hash)
if number == nil {
return
}
receipts := rawdb.ReadReceipts(bc.db, hash, *number)
for _, receipt := range receipts {
for _, log := range receipt.Logs {
del := *log
del.Removed = true
deletedLogs = append(deletedLogs, &del)
}
}
}
)
if oldBlock.NumberU64() > newBlock.NumberU64() {
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
}
} else {
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
}
}
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}
for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
break
}
oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
return fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return fmt.Errorf("Invalid new chain")
}
}
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Debug
if len(oldChain) > 63 {
logFn = log.Warn
}
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
}
var addedTxs types.Transactions
for i := len(newChain) - 1; i >= 0; i-- {
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
addedTxs = append(addedTxs, newChain[i].Transactions()...)
}
batch := bc.db.NewBatch()
for _, tx := range diff {
rawdb.DeleteTxLookupEntry(batch, tx.Hash())
}
batch.Write()
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
}()
}
return nil
}
这个方法是重新调整区块链,传入的两个区块代表旧链和新链,在前面writeBlockWithState方法中有调用。
首先如果旧链比新链高,要减少旧链高度,则从旧链头部开始向下遍历,直到等于要插入的区块高度,将遍历得到的区块即区块中的交易分别放入oldChain和deletedTxs,然后对每一个遍历的区块调用collectLogs方法收集日志信息,并将所有日志信息标记为removed。
如果旧链等于或小于新链高度,则向下遍历新链,将遍历得到的区块放入newChain。
到这里新链旧链的高度相等,但是二者的hash未必相等。所以进入循环,同时向下遍历两条链,寻找二者hash值相等的位置,对于不相等的区块也是放入oldChain和newChain,并将旧链上的区块交易放入deletedTxs,同时用collectLogs处理旧链区块。知道二者的hash相等时,此时二者高度与hash都相等,表示找到一个共同点,也就是分叉点,记为commonBlock并退出循环。
接下来,遍历newChain,对每个区块调用insert,表示更新新链为规范链,然后调用WriteTxLookupEntries写入交易信息,并将每个区块的交易信息放入addedTxs。
然后比较deletedTxs和addedTxs的不同,主要是deletedTxs有的但addedTxs没有的交易。addedTxs是从分叉点开始到新链结束所有的交易,deletedTxs表示从分叉点开始到旧链结束所有的交易。对于找出的不同交易将其从数据库中删除。
InsertHeaderChain
这个方法在downloader的processHeaders中调用,作用是插入区块头
func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
start := time.Now()
if i, err := bc.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
return i, err
}
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
bc.wg.Add(1)
defer bc.wg.Done()
whFunc := func(header *types.Header) error {
_, err := bc.hc.WriteHeader(header)
return err
}
return bc.hc.InsertHeaderChain(chain, whFunc, start)
}
首先验证给的区块头:
func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
for i := 1; i < len(chain); i++ {
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", chain[i].Hash(),
"parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", chain[i-1].Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].Number,
chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
}
}
seals := make([]bool, len(chain))
if checkFreq != 0 {
for i := 0; i < len(seals)/checkFreq; i++ {
index := i*checkFreq + hc.rand.Intn(checkFreq)
if index >= len(seals) {
index = len(seals) - 1
}
seals[index] = true
}
seals[len(seals)-1] = true
}
abort, results := hc.engine.VerifyHeaders(hc, chain, seals)
defer close(abort)
for i, header := range chain {
if hc.procInterrupt() {
log.Debug("Premature abort during headers verification")
return 0, errors.New("aborted")
}
if BadHashes[header.Hash()] {
return i, ErrBlacklistedHash
}
if err := <-results; err != nil {
return i, err
}
}
return 0, nil
}
第一个for循环是检查给定的一组区块相互依赖是否正确。之后根据checkFreq的值确定哪些区块需要验证,这里并不是固定的每个checkFreq检查一个,而是每checkFreq个区块中间随机选一个。最后根据验证结果以及是否在badhash内返回最终结果。
回到InsertHeaderChain,如果验证无误,则调用InsertHeaderChain方法,这是headerchain的方法
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
stats := struct{ processed, ignored int }{}
for i, header := range chain {
if hc.procInterrupt() {
log.Debug("Premature abort during headers import")
return i, errors.New("aborted")
}
if hc.HasHeader(header.Hash(), header.Number.Uint64()) {
stats.ignored++
continue
}
if err := writeHeader(header); err != nil {
return i, err
}
stats.processed++
}
last := chain[len(chain)-1]
context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", last.Number, "hash", last.Hash(),
}
if timestamp := time.Unix(last.Time.Int64(), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Info("Imported new block headers", context...)
return 0, nil
}
首先遍历所有要插入的区块头,先判断是否已经有了,没有的话调用writeHeader,这是在BlockChain的InsertHeaderChain定义的,实际调用的headerchain的WriteHeader方法
func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, err error) {
var (
hash = header.Hash()
number = header.Number.Uint64()
)
ptd := hc.GetTd(header.ParentHash, number-1)
if ptd == nil {
return NonStatTy, consensus.ErrUnknownAncestor
}
localTd := hc.GetTd(hc.currentHeaderHash, hc.CurrentHeader().Number.Uint64())
externTd := new(big.Int).Add(header.Difficulty, ptd)
if err := hc.WriteTd(hash, number, externTd); err != nil {
log.Crit("Failed to write header total difficulty", "err", err)
}
rawdb.WriteHeader(hc.chainDb, header)
if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
batch := hc.chainDb.NewBatch()
for i := number + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(hc.chainDb, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(batch, i)
}
batch.Write()
var (
headHash = header.ParentHash
headNumber = header.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
)
for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash {
rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber)
headHash = headHeader.ParentHash
headNumber = headHeader.Number.Uint64() - 1
headHeader = hc.GetHeader(headHash, headNumber)
}
rawdb.WriteCanonicalHash(hc.chainDb, hash, number)
rawdb.WriteHeadHeaderHash(hc.chainDb, hash)
hc.currentHeaderHash = hash
hc.currentHeader.Store(types.CopyHeader(header))
status = CanonStatTy
} else {
status = SideStatTy
}
hc.headerCache.Add(hash, header)
hc.numberCache.Add(hash, number)
return
}
和插入区块类似,计算localTd与externTd,然后写入总难度即区块头。之后如果外部难度大于本地难度,或者即使相等但仍有50%概率修改规范链。修改的操作如下,首先从要插入的区块头高度开始从本地读取对应高度的规范链hash,每次高度加一,读到一个删一个,知道读不到为止。这样本地规范链的的高度和要插入的区块头高度一样。接着从要插入区块头的高度减一的位置开始,每次减一,读到一个删一个,直到到达某个位置的区块hash等于要插入区块的父hash。经过这两步后,新到的的区块可以正确的插入到规范链中,然后写入规范链hash以及头区块hash,最后改写入状态为CanonStatTy。如果不满足前面的本地难度和外部难度的判断则写入状态为SideStatTy。
回到headerchain的InsertHeaderChain中,在插入所有区块头后,打印了一些日志然后返回,整个InsertHeaderChain结束。
insert
func (bc *BlockChain) insert(block *types.Block) {
updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(bc.db, block.Hash())
bc.currentBlock.Store(block)
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
rawdb.WriteHeadFastBlockHash(bc.db, block.Hash())
bc.currentFastBlock.Store(block)
}
}
这是一个调整主链的方法,在reorg方法中,当遍历到新旧链的公共点之后,开始在新链上遍历区块,每遍历一个区块执行一次insert方法,通过这样改变主链到新链。具体方法如下,首先读取数据库中主链上相应高度的区块hash来决定时候更新CurrentHeader和currentFastBlock。不过首先更新了主链上对应高度的hash,然后修改数据库中最新区块的hash。
通过对新链的每个区块都执行这个方法,使得数据库中规范链的每个位置的区块得到更新,而且currentBlock、currentHeader和currentFastBlock这几个变量也得到更新,从而改变了主链。
题图来自unsplash:https://unsplash.com/photos/HQMyV8a_4_4