go-ethereum中miner源码分析
入口
StartMining
一般来说有两个途径去启动挖矿:
第一个是在启动geth是使用–mine参数启动挖矿,回顾之前geth启动流程,在startNode最后,判断如果使用了–mine参数则调用Ethereim的StartMining方法启动挖矿。
第二个是在节点启动后,通过rpc去连接节点,然后使用mine.start()方法启动,该rpc服务对应的逻辑如下
// go-ethereum\eth\api.go
func (api *PrivateMinerAPI) Start(threads *int) error {
if threads == nil {
return api.e.StartMining(runtime.NumCPU())
}
return api.e.StartMining(*threads)
}
可见也是使用Ethereim的StartMining方法,该方法参数就是挖矿使用的线程数。
func (s *Ethereum) StartMining(threads int) error {
type threaded interface {
SetThreads(threads int)
}
if th, ok := s.engine.(threaded); ok {
log.Info("Updated mining threads", "threads", threads)
if threads == 0 {
threads = -1
}
th.SetThreads(threads)
}
if !s.IsMining() {
s.lock.RLock()
price := s.gasPrice
s.lock.RUnlock()
s.txPool.SetGasPrice(price)
eb, err := s.Etherbase()
if err != nil {
log.Error("Cannot start mining without etherbase", "err", err)
return fmt.Errorf("etherbase missing: %v", err)
}
if clique, ok := s.engine.(*clique.Clique); ok {
wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
if wallet == nil || err != nil {
log.Error("Etherbase account unavailable locally", "err", err)
return fmt.Errorf("signer missing: %v", err)
}
clique.Authorize(eb, wallet.SignData)
}
atomic.StoreUint32(&s.protocolManager.acceptTxs, 1)
go s.miner.Start(eb)
}
return nil
}
第一步是共识引擎修改挖矿线程数,这里我们以Ethash为例,
func (ethash *Ethash) SetThreads(threads int) {
ethash.lock.Lock()
defer ethash.lock.Unlock()
if ethash.shared != nil {
ethash.shared.SetThreads(threads)
return
}
ethash.threads = threads
select {
case ethash.update <- struct{}{}:
default:
}
}
主要就是修改threads字段。接着判断是否正在挖矿,
func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
如果没有在挖矿,则在一个单独的goroutine中运行miner的start方法。
miner
这是一个挖矿工作的帮助类
type Miner struct {
mux *event.TypeMux
worker *worker
coinbase common.Address
eth Backend
engine consensus.Engine
exitCh chan struct{}
canStart int32
shouldStart int32
}
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
worker: newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock),
canStart: 1,
}
go miner.update()
return miner
}
它的new方法子创建Ethereum时被调用来构建一个miner对象,new方法中在一个单独的goroutine中调用了updateff
update
func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer events.Unsubscribe()
for {
select {
case ev := <-events.Chan():
if ev == nil {
return
}
switch ev.Data.(type) {
case downloader.StartEvent:
atomic.StoreInt32(&self.canStart, 0)
if self.Mining() {
self.Stop()
atomic.StoreInt32(&self.shouldStart, 1)
log.Info("Mining aborted due to sync")
}
case downloader.DoneEvent, downloader.FailedEvent:
shouldStart := atomic.LoadInt32(&self.shouldStart) == 1
atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
}
return
}
case <-self.exitCh:
return
}
}
}
这是一个事件订阅接受的逻辑,首先订阅了downloader的三种事件,根据每种事件都有对应逻辑。StartEvent是在节点同步时发出,此时节点正在同步数据,网卡要暂停。FailedEvent与DoneEvent是在同步完成后发出此时可以开始挖矿。还有一点是一旦接受到FailedEvent或DoneEvent事件时就要退出循环。
Start
func (self *Miner) Start(coinbase common.Address) {
atomic.StoreInt32(&self.shouldStart, 1)
self.SetEtherbase(coinbase)
if atomic.LoadInt32(&self.canStart) == 0 {
log.Info("Network syncing, will start miner afterwards")
return
}
self.worker.start()
}
指定一个coinbase开始挖矿,实际执行的是worker
Stop
func (self *Miner) Stop() {
self.worker.stop()
atomic.StoreInt32(&self.shouldStart, 0)
}
Mining
func (self *Miner) Mining() bool {
return self.worker.isRunning()
}
判断是否在挖矿
worker
这个才是真正工作的,miner只是在控制worker,在创建miner是也就创建了worker
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker {
worker := &worker{
config: config,
engine: engine,
eth: eth,
mux: mux,
chain: eth.BlockChain(),
gasFloor: gasFloor,
gasCeil: gasCeil,
isLocalBlock: isLocalBlock,
localUncles: make(map[common.Hash]*types.Block),
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
worker.startCh <- struct{}{}
return worker
}
主要是构建了对象并订阅了几个事件。然后启动了4个goroutine取执行几个方法。每个方法都是事件循环机制。
start
func (w *worker) start() {
atomic.StoreInt32(&w.running, 1)
w.startCh <- struct{}{}
}
miner通过调用start开始挖矿。这里只是给running赋值1标记开始挖矿,另外给startCh赋值触发逻辑。startCh在newWorkLoop中
//worker::newWorkLoop
case <-w.startCh:
clearPending(w.chain.CurrentBlock().NumberU64())
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)
clearPending := func(number uint64) {
w.pendingMu.Lock()
for h, t := range w.pendingTasks {
if t.block.NumberU64()+staleThreshold <= number {
delete(w.pendingTasks, h)
}
}
w.pendingMu.Unlock()
}
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
首先调用clearPending方法清理一些过于旧的task,然后记录当前时间戳,调用commit方法,这里主要是给newWorkCh传递了事件,接着启动了一个定时器(默认3秒触发)。先看newWorkCh的逻辑
//worker::mainLoop
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
直接调用commitNewWork方法
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
w.mu.RLock()
defer w.mu.RUnlock()
tstart := time.Now()
parent := w.chain.CurrentBlock()
if parent.Time().Cmp(new(big.Int).SetInt64(timestamp)) >= 0 {
timestamp = parent.Time().Int64() + 1
}
if now := time.Now().Unix(); timestamp > now+1 {
wait := time.Duration(timestamp-now) * time.Second
log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
time.Sleep(wait)
}
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
Extra: w.extra,
Time: big.NewInt(timestamp),
}
if w.isRunning() {
if w.coinbase == (common.Address{}) {
log.Error("Refusing to mine without etherbase")
return
}
header.Coinbase = w.coinbase
}
if err := w.engine.Prepare(w.chain, header); err != nil {
log.Error("Failed to prepare header for mining", "err", err)
return
}
if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
if w.config.DAOForkSupport {
header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
header.Extra = []byte{}
}
}
}
err := w.makeCurrent(parent, header)
if err != nil {
log.Error("Failed to create mining context", "err", err)
return
}
env := w.current
if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
misc.ApplyDAOHardFork(env.state)
}
uncles := make([]*types.Header, 0, 2)
commitUncles := func(blocks map[common.Hash]*types.Block) {
for hash, uncle := range blocks {
if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
delete(blocks, hash)
}
}
for hash, uncle := range blocks {
if len(uncles) == 2 {
break
}
if err := w.commitUncle(env, uncle.Header()); err != nil {
log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
} else {
log.Debug("Committing new uncle to block", "hash", hash)
uncles = append(uncles, uncle.Header())
}
}
}
commitUncles(w.localUncles)
commitUncles(w.remoteUncles)
if !noempty {
w.commit(uncles, nil, false, tstart)
}
pending, err := w.eth.TxPool().Pending()
if err != nil {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
if len(pending) == 0 {
w.updateSnapshot()
return
}
localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account)
localTxs[account] = txs
}
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
if w.commitTransactions(txs, w.coinbase, interrupt) {
return
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
if w.commitTransactions(txs, w.coinbase, interrupt) {
return
}
}
w.commit(uncles, w.fullTaskHook, true, tstart)
}
首先获取区块链当前区块记为父块,如果提交任务的时间戳比父块时间戳还小,要重设时间戳为父块时间戳加1。另外时间戳也不能比当前时间大,否则睡眠一段时间。
之后构建一个洗呢区块头,先填充父块hash,区块编号,gaslimit,额外数据(默认为geth版本等信息)以及时间戳。如果正在挖矿还要填入Coinbase。之后调用engine.Prepare方法,主要是计算区块应有的难度。再往下调用了makeCurrent方法
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
state, err := w.chain.StateAt(parent.Root())
if err != nil {
return err
}
env := &environment{
signer: types.NewEIP155Signer(w.config.ChainID),
state: state,
ancestors: mapset.NewSet(),
family: mapset.NewSet(),
uncles: mapset.NewSet(),
header: header,
}
for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
for _, uncle := range ancestor.Uncles() {
env.family.Add(uncle.Hash())
}
env.family.Add(ancestor.Hash())
env.ancestors.Add(ancestor.Hash())
}
env.tcount = 0
w.current = env
return nil
}
makeCurrent先获取了父区块的状态树,然后构建了一个environment对象。之后获取了父区块的7个祖先区块,记录每一个祖先hash一起对于的叔块hash,然后设置worker的current为刚才构建的environment。
回到commitNewWork中,接着调用了两次commitUncles分别处理localUncles和remoteUncles。这两个map是在收到ChainSideEvent事件时按照区块的性质添加的。具体处理逻辑是遍历map中的所有区块,对于编号在当前区块之上七代以内的,作为该区块的叔块,但最多只有两个叔块,选择叔块是要先用commitUncle检查,主要检查是否已被作为叔块添加过。接着如果noempty参数为false,commit方法其中update参数为false。否则先从txpool中取出那些可以执行的交易,也就是pending队列中。如果没有可执行交易调用updateSnapshot并返回
func (w *worker) updateSnapshot() {
w.snapshotMu.Lock()
defer w.snapshotMu.Unlock()
var uncles []*types.Header
w.current.uncles.Each(func(item interface{}) bool {
hash, ok := item.(common.Hash)
if !ok {
return false
}
uncle, exist := w.localUncles[hash]
if !exist {
uncle, exist = w.remoteUncles[hash]
}
if !exist {
return false
}
uncles = append(uncles, uncle.Header())
return false
})
w.snapshotBlock = types.NewBlock(
w.current.header,
w.current.txs,
uncles,
w.current.receipts,
)
w.snapshotState = w.current.state.Copy()
}
这里主要是手机有效叔块然后构建一个区块赋值给snapshotBlock,同时复制状态树。
回到commitNewWork中,如果有可执行交易则将交易分为本地交易和远端交易两部分。如果本地交易数量大于0,则执行NewTransactionsByPriceAndNonce,这是将交易按照gasprice排序(pending内部每个地址对应的交易按nonce排序,这里去每个地址的第一个交易按gasprice排序),然后调用commitTransactions方法
func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
if w.current == nil {
return true
}
if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
}
var coalescedLogs []*types.Log
for {
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
if w.current.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
break
}
tx := txs.Peek()
if tx == nil {
break
}
if tx.Protected() && !w.config.IsEIP155(w.current.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.config.EIP155Block)
txs.Pop()
continue
}
w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)
logs, err := w.commitTransaction(tx, coinbase)
switch err {
case core.ErrGasLimitReached:
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
case core.ErrNonceTooLow:
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case core.ErrNonceTooHigh: pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case nil:
coalescedLogs = append(coalescedLogs, logs...)
w.current.tcount++
txs.Shift()
default:
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Shift()
}
}
if !w.isRunning() && len(coalescedLogs) > 0 {
cpy := make([]*types.Log, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(types.Log)
*cpy[i] = *l
}
go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
}
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
}
首先创建一个gaspool来记录gas用量。只有有一个循环,每次从给定的交易集合中取一个,使用peek方法,取的是gasprice最大的那个,然后恢复出该交易的发送人,如果该交易被保护并且不是EIP155分叉则忽略该交易。接着去执行这个交易
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
snap := w.current.state.Snapshot()
receipt, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
if err != nil {
w.current.state.RevertToSnapshot(snap)
return nil, err
}
w.current.txs = append(w.current.txs, tx)
w.current.receipts = append(w.current.receipts, receipt)
return receipt.Logs, nil
}
先获得状态树快照,庵后使用ApplyTransaction执行交易这个我们之前分析过,如果执行出错回滚状态树,否则将交易及收据放到current对应字段,并返回收据logs。回到commitTransactions中,根据对应错误打印不同log并弹出该交易,若没有错收集log,然后调用Shift方法,这个方法是将对应地址的交易集合中第一个交易踢出并重排顺序。
就这样循环直到处理完所有给定的交易,回到commitNewWork中,刚才处理的是本地交易,然后处理远端交集。都处理完后调用commit方法
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
receipts := make([]*types.Receipt, len(w.current.receipts))
for i, l := range w.current.receipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *l
}
s := w.current.state.Copy()
block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
if err != nil {
return err
}
if w.isRunning() {
if interval != nil {
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
w.unconfirmed.Shift(block.NumberU64() - 1)
feesWei := new(big.Int)
for i, tx := range block.Transactions() {
feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
}
feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
"uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")
}
}
if update {
w.updateSnapshot()
}
return nil
}
到这里该处理的交易都处理完了,首先复制所有收据及窗台书,然后调用engine.Finalize方法发放奖励并生成区块。接着如果正在挖矿且没有中断,则给taskCh赋值触发相应逻辑。
//worker::taskLoop
case task := <-w.taskCh:
if w.newTaskHook != nil {
w.newTaskHook(task)
}
sealHash := w.engine.SealHash(task.block.Header())
if sealHash == prev {
continue
}
interrupt()
stopCh, prev = make(chan struct{}), sealHash
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
w.pendingMu.Unlock()
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
}
这里首先计算新区快头的hash值,如果等于前一次任务的,即prev,则退出。否则终止之前操作,然后记录该hash值为prev。然后在pendingTasks中记录这个task,并调用共识引擎的Seal方法,也就是实际的挖矿算法取寻找nonce(详见这里)。当找到后resultCh会收到信号触发对应逻辑
case block := <-w.resultCh:
if block == nil {
continue
}
if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
var (
sealhash = w.engine.SealHash(block.Header())
hash = block.Hash()
)
w.pendingMu.RLock()
task, exist := w.pendingTasks[sealhash]
w.pendingMu.RUnlock()
if !exist {
log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
continue
}
var (
receipts = make([]*types.Receipt, len(task.receipts))
logs []*types.Log
)
for i, receipt := range task.receipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *receipt
for _, log := range receipt.Logs {
log.BlockHash = hash
}
logs = append(logs, receipt.Logs...)
}
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
w.mux.Post(core.NewMinedBlockEvent{Block: block})
var events []interface{}
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
events = append(events, core.ChainHeadEvent{Block: block})
case core.SideStatTy:
events = append(events, core.ChainSideEvent{Block: block})
}
w.chain.PostChainEvents(events, logs)
w.unconfirmed.Insert(block.NumberU64(), block.Hash())
先判断这个找到的block是否为空或已经有了,一切正常的话,从pendingTasks中找到对应task,遍历它的所有收据然后补齐收据中log的BlockHash字段。之后调用WriteBlockWithState写入本地区块链,成功的话发送NewMinedBlockEvent事件,这个事件被ProtocolManager接受并广播新块。在worker中,根据插入结果广播事件。最后将新区快插入到unconfirmed中。
中断
刚才梳理了正常的挖矿流程,在最开始的时候,newWorkLoop收到startCh事件后,会设置一个定时器,他到时间后也会执行commit方法,只不过noempty参数为true,还是一样的流程,在taskLoop收到taskCh启动挖矿是会先执行一次interrupt方法
interrupt := func() {
if stopCh != nil {
close(stopCh)
stopCh = nil
}
}
这里关闭的stopCh就是控制挖矿的,他会停止挖矿,然后随后又重新开始一次
unconfirmedBlocks
这个表示那些本地挖到的但是还未确认的区块,创建worker是被创建
func newUnconfirmedBlocks(chain chainRetriever, depth uint) *unconfirmedBlocks {
return &unconfirmedBlocks{
chain: chain,
depth: depth,
}
}
type unconfirmedBlocks struct {
chain chainRetriever
depth uint
blocks *ring.Ring
lock sync.RWMutex
}
type unconfirmedBlock struct {
index uint64
hash common.Hash
}
Insert
每挖到一个区块就调用一次
func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) {
set.Shift(index)
item := ring.New(1)
item.Value = &unconfirmedBlock{
index: index,
hash: hash,
}
set.lock.Lock()
defer set.lock.Unlock()
if set.blocks == nil {
set.blocks = item
} else {
set.blocks.Move(-1).Link(item)
}
log.Info("🔨 mined potential block", "number", index, "hash", hash)
}
先删除从新区快高度减去depth之外的所有区块,然后新建一个unconfirmedBlock对象,将其加入集合中。
Shift
func (set *unconfirmedBlocks) Shift(height uint64) {
set.lock.Lock()
defer set.lock.Unlock()
for set.blocks != nil {
next := set.blocks.Value.(*unconfirmedBlock)
if next.index+uint64(set.depth) > height {
break
}
header := set.chain.GetHeaderByNumber(next.index)
switch {
case header == nil:
log.Warn("Failed to retrieve header of mined block", "number", next.index, "hash", next.hash)
case header.Hash() == next.hash:
log.Info("🔗 block reached canonical chain", "number", next.index, "hash", next.hash)
default:
included := false
for number := next.index; !included && number < next.index+uint64(set.depth) && number <= height; number++ {
if block := set.chain.GetBlockByNumber(number); block != nil {
for _, uncle := range block.Uncles() {
if uncle.Hash() == next.hash {
included = true
break
}
}
}
}
if included {
log.Info("â‘‚ block became an uncle", "number", next.index, "hash", next.hash)
} else {
log.Info("😱 block lost", "number", next.index, "hash", next.hash)
}
}
if set.blocks.Value == set.blocks.Next().Value {
set.blocks = nil
} else {
set.blocks = set.blocks.Move(-1)
set.blocks.Unlink(1)
set.blocks = set.blocks.Move(1)
}
}
}
会删除那些过于早的区块。遍历其中给的每一个区块,如果其高度加上depth(默认是7)高于给定的高度则跳过,否则按照高度从区块链中取出区块头,如果该区块头hash等于正在遍历的区块头hash,则表示我们挖的区块进入主链。如果不是,则检查是否称为叔块。这一部分检查主要是为了打印log,主要逻辑在最后,此时这个区块在给定区块高度减depth以外,需要从中删除。
unconfirmedBlocks使用了环形链表ring作为主要的数据结构。
题图来自unsplash:https://unsplash.com/photos/-oWyJoSqBRM