go-ethereum中miner源码分析

入口

StartMining

一般来说有两个途径去启动挖矿:

第一个是在启动geth是使用–mine参数启动挖矿,回顾之前geth启动流程,在startNode最后,判断如果使用了–mine参数则调用Ethereim的StartMining方法启动挖矿。

第二个是在节点启动后,通过rpc去连接节点,然后使用mine.start()方法启动,该rpc服务对应的逻辑如下

// go-ethereum\eth\api.go
func (api *PrivateMinerAPI) Start(threads *int) error {
	if threads == nil {
		return api.e.StartMining(runtime.NumCPU())
	}
	return api.e.StartMining(*threads)
}

可见也是使用Ethereim的StartMining方法,该方法参数就是挖矿使用的线程数。

func (s *Ethereum) StartMining(threads int) error {
	type threaded interface {
		SetThreads(threads int)
	}
	if th, ok := s.engine.(threaded); ok {
		log.Info("Updated mining threads", "threads", threads)
		if threads == 0 {
			threads = -1 
		}
		th.SetThreads(threads)
	}

	if !s.IsMining() {
		s.lock.RLock()
		price := s.gasPrice
		s.lock.RUnlock()
		s.txPool.SetGasPrice(price)

		eb, err := s.Etherbase()
		if err != nil {
			log.Error("Cannot start mining without etherbase", "err", err)
			return fmt.Errorf("etherbase missing: %v", err)
		}
		if clique, ok := s.engine.(*clique.Clique); ok {
			wallet, err := s.accountManager.Find(accounts.Account{Address: eb})
			if wallet == nil || err != nil {
				log.Error("Etherbase account unavailable locally", "err", err)
				return fmt.Errorf("signer missing: %v", err)
			}
			clique.Authorize(eb, wallet.SignData)
		}
		atomic.StoreUint32(&s.protocolManager.acceptTxs, 1)

		go s.miner.Start(eb)
	}
	return nil
}

第一步是共识引擎修改挖矿线程数,这里我们以Ethash为例,

func (ethash *Ethash) SetThreads(threads int) {
	ethash.lock.Lock()
	defer ethash.lock.Unlock()

	if ethash.shared != nil {
		ethash.shared.SetThreads(threads)
		return
	}
	ethash.threads = threads
	select {
	case ethash.update <- struct{}{}:
	default:
	}
}

主要就是修改threads字段。接着判断是否正在挖矿,

func (s *Ethereum) IsMining() bool      { return s.miner.Mining() }

如果没有在挖矿,则在一个单独的goroutine中运行miner的start方法。

miner

这是一个挖矿工作的帮助类

type Miner struct {
	mux      *event.TypeMux
	worker   *worker
	coinbase common.Address
	eth      Backend
	engine   consensus.Engine
	exitCh   chan struct{}

	canStart    int32 
	shouldStart int32 
}

func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner {
	miner := &Miner{
		eth:      eth,
		mux:      mux,
		engine:   engine,
		exitCh:   make(chan struct{}),
		worker:   newWorker(config, engine, eth, mux, recommit, gasFloor, gasCeil, isLocalBlock),
		canStart: 1,
	}
	go miner.update()

	return miner
}

它的new方法子创建Ethereum时被调用来构建一个miner对象,new方法中在一个单独的goroutine中调用了updateff

update

func (self *Miner) update() {
	events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
	defer events.Unsubscribe()

	for {
		select {
		case ev := <-events.Chan():
			if ev == nil {
				return
			}
			switch ev.Data.(type) {
			case downloader.StartEvent:
				atomic.StoreInt32(&self.canStart, 0)
				if self.Mining() {
					self.Stop()
					atomic.StoreInt32(&self.shouldStart, 1)
					log.Info("Mining aborted due to sync")
				}
			case downloader.DoneEvent, downloader.FailedEvent:
				shouldStart := atomic.LoadInt32(&self.shouldStart) == 1

				atomic.StoreInt32(&self.canStart, 1)
				atomic.StoreInt32(&self.shouldStart, 0)
				if shouldStart {
					self.Start(self.coinbase)
				}
				return
			}
		case <-self.exitCh:
			return
		}
	}
}

这是一个事件订阅接受的逻辑,首先订阅了downloader的三种事件,根据每种事件都有对应逻辑。StartEvent是在节点同步时发出,此时节点正在同步数据,网卡要暂停。FailedEvent与DoneEvent是在同步完成后发出此时可以开始挖矿。还有一点是一旦接受到FailedEvent或DoneEvent事件时就要退出循环。

Start

func (self *Miner) Start(coinbase common.Address) {
	atomic.StoreInt32(&self.shouldStart, 1)
	self.SetEtherbase(coinbase)

	if atomic.LoadInt32(&self.canStart) == 0 {
		log.Info("Network syncing, will start miner afterwards")
		return
	}
	self.worker.start()
}

指定一个coinbase开始挖矿,实际执行的是worker

Stop

func (self *Miner) Stop() {
	self.worker.stop()
	atomic.StoreInt32(&self.shouldStart, 0)
}

Mining

func (self *Miner) Mining() bool {
	return self.worker.isRunning()
}

判断是否在挖矿

worker

这个才是真正工作的,miner只是在控制worker,在创建miner是也就创建了worker

func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker {
	worker := &worker{
		config:             config,
		engine:             engine,
		eth:                eth,
		mux:                mux,
		chain:              eth.BlockChain(),
		gasFloor:           gasFloor,
		gasCeil:            gasCeil,
		isLocalBlock:       isLocalBlock,
		localUncles:        make(map[common.Hash]*types.Block),
		remoteUncles:       make(map[common.Hash]*types.Block),
		unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
		pendingTasks:       make(map[common.Hash]*task),
		txsCh:              make(chan core.NewTxsEvent, txChanSize),
		chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
		newWorkCh:          make(chan *newWorkReq),
		taskCh:             make(chan *task),
		resultCh:           make(chan *types.Block, resultQueueSize),
		exitCh:             make(chan struct{}),
		startCh:            make(chan struct{}, 1),
		resubmitIntervalCh: make(chan time.Duration),
		resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
	}
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

	if recommit < minRecommitInterval {
		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
		recommit = minRecommitInterval
	}

	go worker.mainLoop()
	go worker.newWorkLoop(recommit)
	go worker.resultLoop()
	go worker.taskLoop()

	worker.startCh <- struct{}{}

	return worker
}

主要是构建了对象并订阅了几个事件。然后启动了4个goroutine取执行几个方法。每个方法都是事件循环机制。

start

func (w *worker) start() {
	atomic.StoreInt32(&w.running, 1)
	w.startCh <- struct{}{}
}

miner通过调用start开始挖矿。这里只是给running赋值1标记开始挖矿,另外给startCh赋值触发逻辑。startCh在newWorkLoop中

//worker::newWorkLoop
case <-w.startCh:
	clearPending(w.chain.CurrentBlock().NumberU64())
	timestamp = time.Now().Unix()
	commit(false, commitInterruptNewHead)
	
clearPending := func(number uint64) {
	w.pendingMu.Lock()
	for h, t := range w.pendingTasks {
		if t.block.NumberU64()+staleThreshold <= number {
			delete(w.pendingTasks, h)
		}
	}
	w.pendingMu.Unlock()
}

commit := func(noempty bool, s int32) {
	if interrupt != nil {
		atomic.StoreInt32(interrupt, s)
	}
	interrupt = new(int32)
	w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
	timer.Reset(recommit)
	atomic.StoreInt32(&w.newTxs, 0)
}

首先调用clearPending方法清理一些过于旧的task,然后记录当前时间戳,调用commit方法,这里主要是给newWorkCh传递了事件,接着启动了一个定时器(默认3秒触发)。先看newWorkCh的逻辑

//worker::mainLoop
case req := <-w.newWorkCh:
	w.commitNewWork(req.interrupt, req.noempty, req.timestamp)

直接调用commitNewWork方法

func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
	w.mu.RLock()
	defer w.mu.RUnlock()

	tstart := time.Now()
	parent := w.chain.CurrentBlock()

	if parent.Time().Cmp(new(big.Int).SetInt64(timestamp)) >= 0 {
		timestamp = parent.Time().Int64() + 1
	}

	if now := time.Now().Unix(); timestamp > now+1 {
		wait := time.Duration(timestamp-now) * time.Second
		log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
		time.Sleep(wait)
	}

	num := parent.Number()
	header := &types.Header{
		ParentHash: parent.Hash(),
		Number:     num.Add(num, common.Big1),
		GasLimit:   core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
		Extra:      w.extra,
		Time:       big.NewInt(timestamp),
	}

	if w.isRunning() {
		if w.coinbase == (common.Address{}) {
			log.Error("Refusing to mine without etherbase")
			return
		}
		header.Coinbase = w.coinbase
	}
	if err := w.engine.Prepare(w.chain, header); err != nil {
		log.Error("Failed to prepare header for mining", "err", err)
		return
	}

	if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
		limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
		if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
			if w.config.DAOForkSupport {
				header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
			} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
				header.Extra = []byte{} 
			}
		}
	}
	err := w.makeCurrent(parent, header)
	if err != nil {
		log.Error("Failed to create mining context", "err", err)
		return
	}
	env := w.current
	if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
		misc.ApplyDAOHardFork(env.state)
	}

	uncles := make([]*types.Header, 0, 2)
	commitUncles := func(blocks map[common.Hash]*types.Block) {

		for hash, uncle := range blocks {
			if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
				delete(blocks, hash)
			}
		}
		for hash, uncle := range blocks {
			if len(uncles) == 2 {
				break
			}
			if err := w.commitUncle(env, uncle.Header()); err != nil {
				log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
			} else {
				log.Debug("Committing new uncle to block", "hash", hash)
				uncles = append(uncles, uncle.Header())
			}
		}
	}

	commitUncles(w.localUncles)
	commitUncles(w.remoteUncles)

	if !noempty {
		w.commit(uncles, nil, false, tstart)
	}

	pending, err := w.eth.TxPool().Pending()
	if err != nil {
		log.Error("Failed to fetch pending transactions", "err", err)
		return
	}

	if len(pending) == 0 {
		w.updateSnapshot()
		return
	}

	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
	for _, account := range w.eth.TxPool().Locals() {
		if txs := remoteTxs[account]; len(txs) > 0 {
			delete(remoteTxs, account)
			localTxs[account] = txs
		}
	}
	if len(localTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
	}
	if len(remoteTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
	}
	w.commit(uncles, w.fullTaskHook, true, tstart)
}

首先获取区块链当前区块记为父块,如果提交任务的时间戳比父块时间戳还小,要重设时间戳为父块时间戳加1。另外时间戳也不能比当前时间大,否则睡眠一段时间。

之后构建一个洗呢区块头,先填充父块hash,区块编号,gaslimit,额外数据(默认为geth版本等信息)以及时间戳。如果正在挖矿还要填入Coinbase。之后调用engine.Prepare方法,主要是计算区块应有的难度。再往下调用了makeCurrent方法

func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
	state, err := w.chain.StateAt(parent.Root())
	if err != nil {
		return err
	}
	env := &environment{
		signer:    types.NewEIP155Signer(w.config.ChainID),
		state:     state,
		ancestors: mapset.NewSet(),
		family:    mapset.NewSet(),
		uncles:    mapset.NewSet(),
		header:    header,
	}

	for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
		for _, uncle := range ancestor.Uncles() {
			env.family.Add(uncle.Hash())
		}
		env.family.Add(ancestor.Hash())
		env.ancestors.Add(ancestor.Hash())
	}

	env.tcount = 0
	w.current = env
	return nil
}

makeCurrent先获取了父区块的状态树,然后构建了一个environment对象。之后获取了父区块的7个祖先区块,记录每一个祖先hash一起对于的叔块hash,然后设置worker的current为刚才构建的environment。

回到commitNewWork中,接着调用了两次commitUncles分别处理localUncles和remoteUncles。这两个map是在收到ChainSideEvent事件时按照区块的性质添加的。具体处理逻辑是遍历map中的所有区块,对于编号在当前区块之上七代以内的,作为该区块的叔块,但最多只有两个叔块,选择叔块是要先用commitUncle检查,主要检查是否已被作为叔块添加过。接着如果noempty参数为false,commit方法其中update参数为false。否则先从txpool中取出那些可以执行的交易,也就是pending队列中。如果没有可执行交易调用updateSnapshot并返回

func (w *worker) updateSnapshot() {
	w.snapshotMu.Lock()
	defer w.snapshotMu.Unlock()

	var uncles []*types.Header
	w.current.uncles.Each(func(item interface{}) bool {
		hash, ok := item.(common.Hash)
		if !ok {
			return false
		}
		uncle, exist := w.localUncles[hash]
		if !exist {
			uncle, exist = w.remoteUncles[hash]
		}
		if !exist {
			return false
		}
		uncles = append(uncles, uncle.Header())
		return false
	})

	w.snapshotBlock = types.NewBlock(
		w.current.header,
		w.current.txs,
		uncles,
		w.current.receipts,
	)

	w.snapshotState = w.current.state.Copy()
}

这里主要是手机有效叔块然后构建一个区块赋值给snapshotBlock,同时复制状态树。

回到commitNewWork中,如果有可执行交易则将交易分为本地交易和远端交易两部分。如果本地交易数量大于0,则执行NewTransactionsByPriceAndNonce,这是将交易按照gasprice排序(pending内部每个地址对应的交易按nonce排序,这里去每个地址的第一个交易按gasprice排序),然后调用commitTransactions方法

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
	if w.current == nil {
		return true
	}

	if w.current.gasPool == nil {
		w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
	}

	var coalescedLogs []*types.Log

	for {
		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
			if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
				ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
				if ratio < 0.1 {
					ratio = 0.1
				}
				w.resubmitAdjustCh <- &intervalAdjust{
					ratio: ratio,
					inc:   true,
				}
			}
			return atomic.LoadInt32(interrupt) == commitInterruptNewHead
		}
		if w.current.gasPool.Gas() < params.TxGas {
			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
			break
		}
		tx := txs.Peek()
		if tx == nil {
			break
		}
		if tx.Protected() && !w.config.IsEIP155(w.current.header.Number) {
			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.config.EIP155Block)

			txs.Pop()
			continue
		}
		w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)

		logs, err := w.commitTransaction(tx, coinbase)
		switch err {
		case core.ErrGasLimitReached:
			log.Trace("Gas limit exceeded for current block", "sender", from)
			txs.Pop()

		case core.ErrNonceTooLow:
			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
			txs.Shift()

		case core.ErrNonceTooHigh: pool and miner, skip account =
			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
			txs.Pop()

		case nil:
			coalescedLogs = append(coalescedLogs, logs...)
			w.current.tcount++
			txs.Shift()

		default:
			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
			txs.Shift()
		}
	}

	if !w.isRunning() && len(coalescedLogs) > 0 {
		cpy := make([]*types.Log, len(coalescedLogs))
		for i, l := range coalescedLogs {
			cpy[i] = new(types.Log)
			*cpy[i] = *l
		}
		go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
	}
	if interrupt != nil {
		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
	}
	return false
}

首先创建一个gaspool来记录gas用量。只有有一个循环,每次从给定的交易集合中取一个,使用peek方法,取的是gasprice最大的那个,然后恢复出该交易的发送人,如果该交易被保护并且不是EIP155分叉则忽略该交易。接着去执行这个交易

func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
	snap := w.current.state.Snapshot()

	receipt, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
	if err != nil {
		w.current.state.RevertToSnapshot(snap)
		return nil, err
	}
	w.current.txs = append(w.current.txs, tx)
	w.current.receipts = append(w.current.receipts, receipt)

	return receipt.Logs, nil
}

先获得状态树快照,庵后使用ApplyTransaction执行交易这个我们之前分析过,如果执行出错回滚状态树,否则将交易及收据放到current对应字段,并返回收据logs。回到commitTransactions中,根据对应错误打印不同log并弹出该交易,若没有错收集log,然后调用Shift方法,这个方法是将对应地址的交易集合中第一个交易踢出并重排顺序。

就这样循环直到处理完所有给定的交易,回到commitNewWork中,刚才处理的是本地交易,然后处理远端交集。都处理完后调用commit方法

func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
	receipts := make([]*types.Receipt, len(w.current.receipts))
	for i, l := range w.current.receipts {
		receipts[i] = new(types.Receipt)
		*receipts[i] = *l
	}
	s := w.current.state.Copy()
	block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
	if err != nil {
		return err
	}
	if w.isRunning() {
		if interval != nil {
			interval()
		}
		select {
		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
			w.unconfirmed.Shift(block.NumberU64() - 1)

			feesWei := new(big.Int)
			for i, tx := range block.Transactions() {
				feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
			}
			feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))

			log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
				"uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))

		case <-w.exitCh:
			log.Info("Worker has exited")
		}
	}
	if update {
		w.updateSnapshot()
	}
	return nil
}

到这里该处理的交易都处理完了,首先复制所有收据及窗台书,然后调用engine.Finalize方法发放奖励并生成区块。接着如果正在挖矿且没有中断,则给taskCh赋值触发相应逻辑。

//worker::taskLoop
case task := <-w.taskCh:
	if w.newTaskHook != nil {
		w.newTaskHook(task)
	}
	sealHash := w.engine.SealHash(task.block.Header())
	if sealHash == prev {
		continue
	}
	interrupt()
	stopCh, prev = make(chan struct{}), sealHash

	if w.skipSealHook != nil && w.skipSealHook(task) {
		continue
	}
	w.pendingMu.Lock()
	w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
	w.pendingMu.Unlock()

	if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
		log.Warn("Block sealing failed", "err", err)
	}

这里首先计算新区快头的hash值,如果等于前一次任务的,即prev,则退出。否则终止之前操作,然后记录该hash值为prev。然后在pendingTasks中记录这个task,并调用共识引擎的Seal方法,也就是实际的挖矿算法取寻找nonce(详见这里)。当找到后resultCh会收到信号触发对应逻辑

case block := <-w.resultCh:
	if block == nil {
		continue
	}
	if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
		continue
	}
	var (
		sealhash = w.engine.SealHash(block.Header())
		hash     = block.Hash()
	)
	w.pendingMu.RLock()
	task, exist := w.pendingTasks[sealhash]
	w.pendingMu.RUnlock()
	if !exist {
		log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
		continue
	}
	var (
		receipts = make([]*types.Receipt, len(task.receipts))
		logs     []*types.Log
	)
	for i, receipt := range task.receipts {
		receipts[i] = new(types.Receipt)
		*receipts[i] = *receipt
		for _, log := range receipt.Logs {
			log.BlockHash = hash
		}
		logs = append(logs, receipt.Logs...)
	}
	stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
	if err != nil {
		log.Error("Failed writing block to chain", "err", err)
		continue
	}
	log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
		"elapsed", common.PrettyDuration(time.Since(task.createdAt)))

	w.mux.Post(core.NewMinedBlockEvent{Block: block})

	var events []interface{}
	switch stat {
	case core.CanonStatTy:
		events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
		events = append(events, core.ChainHeadEvent{Block: block})
	case core.SideStatTy:
		events = append(events, core.ChainSideEvent{Block: block})
	}
	w.chain.PostChainEvents(events, logs)
	w.unconfirmed.Insert(block.NumberU64(), block.Hash())

先判断这个找到的block是否为空或已经有了,一切正常的话,从pendingTasks中找到对应task,遍历它的所有收据然后补齐收据中log的BlockHash字段。之后调用WriteBlockWithState写入本地区块链,成功的话发送NewMinedBlockEvent事件,这个事件被ProtocolManager接受并广播新块。在worker中,根据插入结果广播事件。最后将新区快插入到unconfirmed中。

中断

刚才梳理了正常的挖矿流程,在最开始的时候,newWorkLoop收到startCh事件后,会设置一个定时器,他到时间后也会执行commit方法,只不过noempty参数为true,还是一样的流程,在taskLoop收到taskCh启动挖矿是会先执行一次interrupt方法

interrupt := func() {
	if stopCh != nil {
		close(stopCh)
		stopCh = nil
	}
}

这里关闭的stopCh就是控制挖矿的,他会停止挖矿,然后随后又重新开始一次

unconfirmedBlocks

这个表示那些本地挖到的但是还未确认的区块,创建worker是被创建

func newUnconfirmedBlocks(chain chainRetriever, depth uint) *unconfirmedBlocks {
	return &unconfirmedBlocks{
		chain: chain,
		depth: depth,
	}
}

type unconfirmedBlocks struct {
	chain  chainRetriever 
	depth  uint          
	blocks *ring.Ring    
	lock   sync.RWMutex   
}

type unconfirmedBlock struct {
	index uint64
	hash  common.Hash
}

Insert

每挖到一个区块就调用一次

func (set *unconfirmedBlocks) Insert(index uint64, hash common.Hash) {
	set.Shift(index)

	item := ring.New(1)
	item.Value = &unconfirmedBlock{
		index: index,
		hash:  hash,
	}

	set.lock.Lock()
	defer set.lock.Unlock()

	if set.blocks == nil {
		set.blocks = item
	} else {
		set.blocks.Move(-1).Link(item)
	}
	
	log.Info("🔨 mined potential block", "number", index, "hash", hash)
}

先删除从新区快高度减去depth之外的所有区块,然后新建一个unconfirmedBlock对象,将其加入集合中。

Shift

func (set *unconfirmedBlocks) Shift(height uint64) {
	set.lock.Lock()
	defer set.lock.Unlock()

	for set.blocks != nil {
		next := set.blocks.Value.(*unconfirmedBlock)
		if next.index+uint64(set.depth) > height {
			break
		}
		header := set.chain.GetHeaderByNumber(next.index)
		switch {
		case header == nil:
			log.Warn("Failed to retrieve header of mined block", "number", next.index, "hash", next.hash)
		case header.Hash() == next.hash:
			log.Info("🔗 block reached canonical chain", "number", next.index, "hash", next.hash)
		default:
			included := false
			for number := next.index; !included && number < next.index+uint64(set.depth) && number <= height; number++ {
				if block := set.chain.GetBlockByNumber(number); block != nil {
					for _, uncle := range block.Uncles() {
						if uncle.Hash() == next.hash {
							included = true
							break
						}
					}
				}
			}
			if included {
				log.Info("â‘‚ block became an uncle", "number", next.index, "hash", next.hash)
			} else {
				log.Info("😱 block lost", "number", next.index, "hash", next.hash)
			}
		}
		if set.blocks.Value == set.blocks.Next().Value {
			set.blocks = nil
		} else {
			set.blocks = set.blocks.Move(-1)
			set.blocks.Unlink(1)
			set.blocks = set.blocks.Move(1)
		}
	}
}

会删除那些过于早的区块。遍历其中给的每一个区块,如果其高度加上depth(默认是7)高于给定的高度则跳过,否则按照高度从区块链中取出区块头,如果该区块头hash等于正在遍历的区块头hash,则表示我们挖的区块进入主链。如果不是,则检查是否称为叔块。这一部分检查主要是为了打印log,主要逻辑在最后,此时这个区块在给定区块高度减depth以外,需要从中删除。

unconfirmedBlocks使用了环形链表ring作为主要的数据结构。

题图来自unsplash:https://unsplash.com/photos/-oWyJoSqBRM