go-ethereum中eth-downloader源码学习(二)

本文是downloader的源码分析的第二部分,分析peer、queue和statesync这三个辅助部分

peer.go

peer表示了一个节点,封装了一系列有用的方法

type Peer interface {
	LightPeer
	RequestBodies([]common.Hash) error
	RequestReceipts([]common.Hash) error
	RequestNodeData([]common.Hash) error
}

type LightPeer interface {
	Head() (common.Hash, *big.Int)
	RequestHeadersByHash(common.Hash, int, int, bool) error
	RequestHeadersByNumber(uint64, int, int, bool) error
}

register & Unregister

实际使用中,当一个连接建立后,使用pm的handle去处理一个peer时会用RegisterPeer的RegisterPeer方法去注册一个peer,RegisterPeer方法如下

func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
	logger := log.New("peer", id)
	logger.Trace("Registering sync peer")
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
		logger.Error("Failed to register sync peer", "err", err)
		return err
	}
	d.qosReduceConfidence()

	return nil
}

可见实际上是向peers中注册,peers是一个peerSet对象,它的注册的是peerConnection对象

type peerSet struct {
	peers        map[string]*peerConnection
	newPeerFeed  event.Feed
	peerDropFeed event.Feed
	lock         sync.RWMutex
}
type peerConnection struct {
	id string // Unique identifier of the peer

	headerIdle  int32 // Current header activity state of the peer (idle = 0, active = 1)
	blockIdle   int32 // Current block activity state of the peer (idle = 0, active = 1)
	receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
	stateIdle   int32 // Current node data activity state of the peer (idle = 0, active = 1)

	headerThroughput  float64 // Number of headers measured to be retrievable per second
	blockThroughput   float64 // Number of blocks (bodies) measured to be retrievable per second
	receiptThroughput float64 // Number of receipts measured to be retrievable per second
	stateThroughput   float64 // Number of node data pieces measured to be retrievable per second

	rtt time.Duration // Request round trip time to track responsiveness (QoS)

	headerStarted  time.Time // Time instance when the last header fetch was started
	blockStarted   time.Time // Time instance when the last block (body) fetch was started
	receiptStarted time.Time // Time instance when the last receipt fetch was started
	stateStarted   time.Time // Time instance when the last node data fetch was started

	lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)

	peer Peer

	version int        // Eth protocol version number to switch strategies
	log     log.Logger // Contextual logger to add extra infos to peer logs
	lock    sync.RWMutex
}

peerSet的Register方法如下

func (ps *peerSet) Register(p *peerConnection) error {
	p.rtt = ps.medianRTT()

	ps.lock.Lock()
	if _, ok := ps.peers[p.id]; ok {
		ps.lock.Unlock()
		return errAlreadyRegistered
	}
	if len(ps.peers) > 0 {
		p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0

		for _, peer := range ps.peers {
			peer.lock.RLock()
			p.headerThroughput += peer.headerThroughput
			p.blockThroughput += peer.blockThroughput
			p.receiptThroughput += peer.receiptThroughput
			p.stateThroughput += peer.stateThroughput
			peer.lock.RUnlock()
		}
		p.headerThroughput /= float64(len(ps.peers))
		p.blockThroughput /= float64(len(ps.peers))
		p.receiptThroughput /= float64(len(ps.peers))
		p.stateThroughput /= float64(len(ps.peers))
	}
	ps.peers[p.id] = p
	ps.lock.Unlock()

	ps.newPeerFeed.Send(p)
	return nil
}

medianRTT是取ps存储的所有peer的rtt的中位数,RTT即round-trip time,往返时间,可以评估和一个peer的通信质量。medianRTT实现如下:

func (ps *peerSet) medianRTT() time.Duration {
	ps.lock.RLock()
	defer ps.lock.RUnlock()

	rtts := make([]float64, 0, len(ps.peers))
	for _, p := range ps.peers {
		p.lock.RLock()
		rtts = append(rtts, float64(p.rtt))
		p.lock.RUnlock()
	}
	sort.Float64s(rtts)

	median := rttMaxEstimate
	if qosTuningPeers <= len(rtts) {
		median = time.Duration(rtts[qosTuningPeers/2]) 
	} else if len(rtts) > 0 {
		median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
	}
	
	if median < rttMinEstimate {
		median = rttMinEstimate
	}
	if median > rttMaxEstimate {
		median = rttMaxEstimate
	}
	return median
}

逻辑很简单,首先取出所有peer的rtt,然后排序,如果序列长度大于5,就只取前5个的中位数,否则就是序列的中位数,获得的时间最后控制在2秒到20秒之间。

回到Register中,先看是否注册过,若没有再判断是否注册过其他peer,则计算headerThroughput、blockThroughput、receiptThroughput和stateThroughput四个值,分别代表:每秒可检索的区块头数量,每秒可检索的区块体数量,每秒可检索的收据数量和每秒可检测的节点数据数量。然后将peer按照其id放入peers中。

有注册就有反注册,Unregister实现如下

func (ps *peerSet) Unregister(id string) error {
	ps.lock.Lock()
	p, ok := ps.peers[id]
	if !ok {
		defer ps.lock.Unlock()
		return errNotRegistered
	}
	delete(ps.peers, id)
	ps.lock.Unlock()

	ps.peerDropFeed.Send(p)
	return nil
}

很简单就是先判断在删除。

FetchXxx

peerConnection除了封装一个peer,还有一系列的peer相关的信息,除了前面提到过的headerThroughput、blockThroughput、receiptThroughput和stateThroughput,还有headerIdle、blockIdle、receiptIdle和stateIdle分别用来记录peer对应的工作状态,这几个变量为0时表示对应状态空闲,为1时表示对应状态繁忙。除此之外还有headerStarted、blockStarted、receiptStarted和stateStarted几个时间变量记录对应工作开始的时间。

以head为例介绍一下相关变量的改变,在peerConnection有一个FetchHeaders表示请求区块头,实现如下

func (p *peerConnection) FetchHeaders(from uint64, count int) error {
	if p.version < 62 {
		panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
	}

	if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
		return errAlreadyFetching
	}
	p.headerStarted = time.Now()

	go p.peer.RequestHeadersByNumber(from, count, 0, false)

	return nil
}

这里先判断了eth协议的版本号,然后将headerIdle置为1,并记录开始时间到headerStarted,真正的请求逻辑在peer的RequestHeadersByNumber方法。这里的peer是ProtocolManager的newPeer方法创建的peer(go-ethereum\eth\peer.go)。

同样还有FetchBodies、FetchReceipts和FetchNodeData等方法。

SetXxxIdle

前面的FetchXxx是将对应状态置为忙碌,这里SetXxxIdle则是相反,他们都调用的是setIdle方法。

func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
	defer atomic.StoreInt32(idle, 0)

	p.lock.Lock()
	defer p.lock.Unlock()

	if delivered == 0 {
		*throughput = 0
		return
	}

	elapsed := time.Since(started) + 1 
	measured := float64(delivered) / (float64(elapsed) / float64(time.Second))

	*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
	p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))

	p.log.Trace("Peer throughput measurements updated",
		"hps", p.headerThroughput, "bps", p.blockThroughput,
		"rps", p.receiptThroughput, "sps", p.stateThroughput,
		"miss", len(p.lacking), "rtt", p.rtt)
}

首先置0,表示空闲,然后如果delivered为0,表示没有传送到,则该peer的对应throughput置0.否则更新throughput和rtt。

XxxCapacity

返回的对应状态的吞吐量,以heads为例

func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
	p.lock.RLock()
	defer p.lock.RUnlock()

	return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
}

XxxIdlePeers

用来获取对应状态空闲的peer

func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
	idle := func(p *peerConnection) bool {
		return atomic.LoadInt32(&p.headerIdle) == 0
	}
	throughput := func(p *peerConnection) float64 {
		p.lock.RLock()
		defer p.lock.RUnlock()
		return p.headerThroughput
	}
	return ps.idlePeers(62, 64, idle, throughput)
}

调用的是idlePeers

func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
	ps.lock.RLock()
	defer ps.lock.RUnlock()

	idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
	for _, p := range ps.peers {
		if p.version >= minProtocol && p.version <= maxProtocol {
			if idleCheck(p) {
				idle = append(idle, p)
			}
			total++
		}
	}
	for i := 0; i < len(idle); i++ {
		for j := i + 1; j < len(idle); j++ {
			if throughput(idle[i]) < throughput(idle[j]) {
				idle[i], idle[j] = idle[j], idle[i]
			}
		}
	}
	return idle, total
}

就是简单的遍历,然后将结果按throughput排序。

Reset

重置所有变量

func (p *peerConnection) Reset() {
	p.lock.Lock()
	defer p.lock.Unlock()

	atomic.StoreInt32(&p.headerIdle, 0)
	atomic.StoreInt32(&p.blockIdle, 0)
	atomic.StoreInt32(&p.receiptIdle, 0)
	atomic.StoreInt32(&p.stateIdle, 0)

	p.headerThroughput = 0
	p.blockThroughput = 0
	p.receiptThroughput = 0
	p.stateThroughput = 0

	p.lacking = make(map[common.Hash]struct{})
}

queue.go

这实际上起到一个调度的作用,配合downloader的fetchParts方法,它用来告诉downloader哪些可以去处理。先看构造方法,在downloader创建时调用了newQueue方法创建了一个queue

func newQueue() *queue {
	lock := new(sync.Mutex)
	return &queue{
		headerPendPool:   make(map[string]*fetchRequest),
		headerContCh:     make(chan bool),
		blockTaskPool:    make(map[common.Hash]*types.Header),
		blockTaskQueue:   prque.New(nil),
		blockPendPool:    make(map[string]*fetchRequest),
		blockDonePool:    make(map[common.Hash]struct{}),
		receiptTaskPool:  make(map[common.Hash]*types.Header),
		receiptTaskQueue: prque.New(nil),
		receiptPendPool:  make(map[string]*fetchRequest),
		receiptDonePool:  make(map[common.Hash]struct{}),
		resultCache:      make([]*fetchResult, blockCacheItems),
		active:           sync.NewCond(lock),
		lock:             lock,
	}
}

其中的active变量是一个Cond类型的锁,它有一个锁L可以正常加锁解锁,除此之外还有wait进入阻塞,Signal和Broadcast可以进行唤醒。receiptTaskQueue与blockTaskQueue是两个优先级队列。

Schedule

func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
	q.lock.Lock()
	defer q.lock.Unlock()

	inserts := make([]*types.Header, 0, len(headers))
	for _, header := range headers {
		hash := header.Hash()
		if header.Number == nil || header.Number.Uint64() != from {
			log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
			break
		}
		if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
			log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
			break
		}
		if _, ok := q.blockTaskPool[hash]; ok {
			log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
			continue
		}
		if _, ok := q.receiptTaskPool[hash]; ok {
			log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
			continue
		}
		q.blockTaskPool[hash] = header
		q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))

		if q.mode == FastSync {
			q.receiptTaskPool[hash] = header
			q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
		}
		inserts = append(inserts, header)
		q.headerHead = hash
		from++
	}
	return inserts
}

这个方法在downloader的processHeaders方法中出现。传入的参数headers是一组head,from是开始的位置。这个方法的主要作用是申请对一些区块头进行下载的调度。首先创建了一个Header数组,之后遍历了所有传入的head,对每个head进行一系列检查。

首先检测head格式是否正确,即编号是否为空,且是否为正确的编号。在检测区块间是否为父子关系,即检测父区块哈希是否正确且对应。然后在检测是否已经位于blockTaskPool和receiptTaskPool中,是的话直接跳过。

检测完毕后,将head放入blockTaskPool,这个map存储着等待检索区块体的任务。之后再放入blockTaskQueue这个队列,并以区块编号的负数作为优先级,编号越小的区块优先级越高。再接着对于fast模式下,再向receiptTaskPool和receiptTaskQueue添加内容,这也就显示出fast模式的特性。之后将head放入insert中,然后更新headerHead与from,用于下一个区块验证。整个方法的返回值就是要等待请求的区块头集合。

其中的两个TaskPool主要作用是避免重复申请调度,会在请求结束后在DeliverXxx方法中被删除。而这两个TaskQueue则是调度的关键,存储着等待调度的东西,在后面的ReserveXxx方法中会被拿出来构造请求进行处理。

ScheduleSkeleton

这个方法出现在downloader的fillHeaderSkeleton方法中,就是在fetchHeaders中对于骨架模式进行填充

func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
	q.lock.Lock()
	defer q.lock.Unlock()

	if q.headerResults != nil {
		panic("skeleton assembly already in progress")
	}

	q.headerTaskPool = make(map[uint64]*types.Header)
	q.headerTaskQueue = prque.New(nil)
	q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
	q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
	q.headerProced = 0
	q.headerOffset = from
	q.headerContCh = make(chan bool, 1)

	for i, header := range skeleton {
		index := from + uint64(i*MaxHeaderFetch)

		q.headerTaskPool[index] = header
		q.headerTaskQueue.Push(index, -int64(index))
	}
}

首先避免重复框架组装,通过检测headerResults是否为空来判断。之后遍历skeleton,skeleton是一组断续的header,他从from+191位置开始,每隔192个区块请求一个,总共请求128个。遍历的时候用headerTaskPool记录框架对于位置的head,并将index按照其负数作为优先级存入headerTaskQueue队列。headerTaskPool和headerTaskQueue的作用和前面Schedule方法中那两对变量作用类似。

ReserveXxx

这是一组方法,在downloader的Synchronise流程最后定义了一组fetch方法,最后都调用了fetchParts方法,其中reserve参数就是这里对应的ReserveXxx方法,这组方法的主要用途是从TaskQueue队列中领取任务构造请求便于后续执行。

一共有三个方法:ReserveReceipts、ReserveBodies和ReserveHeaders,但是ReserveReceipts和ReserveBodies都调用了reserveHeaders方法

func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
	isNoop := func(header *types.Header) bool {
		return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
	}
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
}

func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
	isNoop := func(header *types.Header) bool {
		return header.ReceiptHash == types.EmptyRootHash
	}
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
}

reserveHeaders

这里看一下这个方法

func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
	if taskQueue.Empty() {
		return nil, false, nil
	}
	if _, ok := pendPool[p.id]; ok {
		return nil, false, nil
	}
	space := q.resultSlots(pendPool, donePool)

	send := make([]*types.Header, 0, count)
	skip := make([]*types.Header, 0)

	progress := false
	for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
		header := taskQueue.PopItem().(*types.Header)
		hash := header.Hash()

		index := int(header.Number.Int64() - int64(q.resultOffset))
		if index >= len(q.resultCache) || index < 0 {
			common.Report("index allocation went beyond available resultCache space")
			return nil, false, errInvalidChain
		}
		if q.resultCache[index] == nil {
			components := 1
			if q.mode == FastSync {
				components = 2
			}
			q.resultCache[index] = &fetchResult{
				Pending: components,
				Hash:    hash,
				Header:  header,
			}
		}

		if isNoop(header) {
			donePool[hash] = struct{}{}
			delete(taskPool, hash)

			space, proc = space-1, proc-1
			q.resultCache[index].Pending--
			progress = true
			continue
		}

		if p.Lacks(hash) {
			skip = append(skip, header)
		} else {
			send = append(send, header)
		}
	}

	for _, header := range skip {
		taskQueue.Push(header, -int64(header.Number.Uint64()))
	}
	if progress {
		q.active.Signal()
	}
	if len(send) == 0 {
		return nil, progress, nil
	}
	request := &fetchRequest{
		Peer:    p,
		Headers: send,
		Time:    time.Now(),
	}
	pendPool[p.id] = request

	return request, progress, nil
}

首先检查taskQueue是否为空,taskQueue是作为参数传入的,对应ReserveBodies和ReserveReceipts分别是blockTaskQueue和receiptTaskQueue,这两个队列分别在前面Schedule方法中被填充。接着计算了fetch的上限,创建了两个数组send和skip。

然后开始循环,每次从taskQueue获取优先级最高的item,然后根据区块头的高度计算index,index表示在resultCache的位置,resultCache是一个fetchResult数组,存储已经下载完毕的结果。如果resultCache对应位置为空,则构造一个fetchResult对象存入。然后调用了isNoop方法,这是方法的一个参数,用于检测是否为包含交易。如果是的话表示不需要继续操作,将其donePool对应元素置空,然后从taskPool中删除,由于taskPool被改变,则space和proc两个计数器响应的都减一。如果不为空,检测hash是否在lacking中,如在表示没有这个hash相关数据,则放入skip中,否则放入send中。

循环结束后,也就是从taskPool中分类一定数量的head后,遍历skip,将其中改的head按期高度的负数再次添加到对应taskQueue队列中。然后如果progress为true,表示刚才循环中出现没有交易的区块头,则调用active的Signal方法,唤醒wait,他在Result方法中阻塞,稍后介绍。之后检测send数组,如果空则返回,否则构建一个fetchRequest,然后按照peer的id放入pendPool。

ReserveHeaders

这个方法和前面的ScheduleSkeleton一样只有在骨架模式下才回调用,该方法实现如下

func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
	q.lock.Lock()
	defer q.lock.Unlock()

	if _, ok := q.headerPendPool[p.id]; ok {
		return nil
	}

	send, skip := uint64(0), []uint64{}
	for send == 0 && !q.headerTaskQueue.Empty() {
		from, _ := q.headerTaskQueue.Pop()
		if q.headerPeerMiss[p.id] != nil {
			if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
				skip = append(skip, from.(uint64))
				continue
			}
		}
		send = from.(uint64)
	}

	for _, from := range skip {
		q.headerTaskQueue.Push(from, -int64(from))
	}

	if send == 0 {
		return nil
	}
	request := &fetchRequest{
		Peer: p,
		From: send,
		Time: time.Now(),
	}
	q.headerPendPool[p.id] = request
	return request
}

逻辑和reserveHeaders类似,首先检测peer是否已在headerPendPool中。然后遍历headerTaskQueue,每次取优先级最高的,在检测对应hash是否在headerPeerMiss中,headerPeerMiss记录了每个peer明确不可用的区块,是的话放入skip以跳过,这个循环直到取到一个有效的head位置,然后记录from值到send。后续的逻辑就和reserveHeaders差不多,对于skip的再次放入headerTaskQueue,然后构造一个fetchRequest放入headerPendPool中。注意这个fetchRequest和前面的reserveHeaders最后构造的不太一样,这里没有传入Heads字段,而是写入了From字段,表示骨架中第一个有效的head,由于骨架是有规律的记录起始位置即可。

DeliverXxx

这也是一组方法,和ReserveXxx类似,都是在调用fetchParts时作为参数传入的,这组方法的作用是在数据下载完毕后被调用。有DeliverBodies、DeliverReceipts和DeliverReceipts三个,前两个都是直接调用了deliver方法

func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
			return errInvalidBody
		}
		result.Transactions = txLists[index]
		result.Uncles = uncleLists[index]
		return nil
	}
	return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}

func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	reconstruct := func(header *types.Header, index int, result *fetchResult) error {
		if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
			return errInvalidReceipt
		}
		result.Receipts = receiptList[index]
		return nil
	}
	return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}

deliver

func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
	pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
	results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {

	request := pendPool[id]
	if request == nil {
		return 0, errNoFetchesPending
	}
	reqTimer.UpdateSince(request.Time)
	delete(pendPool, id)

	if results == 0 {
		for _, header := range request.Headers {
			request.Peer.MarkLacking(header.Hash())
		}
	}

	var (
		accepted int
		failure  error
		useful   bool
	)
	for i, header := range request.Headers {
		if i >= results {
			break
		}
		index := int(header.Number.Int64() - int64(q.resultOffset))
		if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
			failure = errInvalidChain
			break
		}
		if err := reconstruct(header, i, q.resultCache[index]); err != nil {
			failure = err
			break
		}
		hash := header.Hash()

		donePool[hash] = struct{}{}
		q.resultCache[index].Pending--
		useful = true
		accepted++

		request.Headers[i] = nil
		delete(taskPool, hash)
	}
	for _, header := range request.Headers {
		if header != nil {
			taskQueue.Push(header, -int64(header.Number.Uint64()))
		}
	}
	if accepted > 0 {
		q.active.Signal()
	}
	switch {
	case failure == nil || failure == errInvalidChain:
		return accepted, failure
	case useful:
		return accepted, fmt.Errorf("partial failure: %v", failure)
	default:
		return accepted, errStaleDelivery
	}
}

首先检测相应id是否在pendPool,pendPool是参数,对应DeliverBodies和DeliverReceipts分别是blockPendPool和receiptPendPool,他们分别在对应的ReserveXxx中被填充。如果没有则返回错误,有的话先从pendPool将其删除,表示已经结束。之后检查result的值,他表示DeliverXxx传入的List的长度,这个list表示检索到的数据集合,如果为0表示相应的head在该peer取不到,则调用MarkLacking将其添加到lacking中。

接下来进入一个循环,变量最开始从pendPool取出的request的Headers,然后计算在resultCache的index,之后利用reconstruct构建,reconstruct是传入的参数,主要作用是补全fetchResult,如在DeliverReceipts中是补全Receipts字段,在DeliverBodies补全Transactions和Uncles字段。之后在donePool标记对应hash的任务已经完成,并且useful写为true,accepted自增一,清空request对应位置的数据,并且删除taskPool对应hash内容。

循环结束后,由于刚才循环最后遍历results个,可能有剩余,将剩余的head放入对应taskQueue中。之后如果accept大于一,表示有任务真正完成了,同样调用active的Signal唤醒阻塞,和前面reserveHeaders类似。最后返回错误报告和accepted数量。

DeliverHeaders

func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
	q.lock.Lock()
	defer q.lock.Unlock()

	request := q.headerPendPool[id]
	if request == nil {
		return 0, errNoFetchesPending
	}
	headerReqTimer.UpdateSince(request.Time)
	delete(q.headerPendPool, id)

	target := q.headerTaskPool[request.From].Hash()

	accepted := len(headers) == MaxHeaderFetch
	if accepted {
		if headers[0].Number.Uint64() != request.From {
			log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
			accepted = false
		} else if headers[len(headers)-1].Hash() != target {
			log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
			accepted = false
		}
	}
	if accepted {
		for i, header := range headers[1:] {
			hash := header.Hash()
			if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
				log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
				accepted = false
				break
			}
			if headers[i].Hash() != header.ParentHash {
				log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
				accepted = false
				break
			}
		}
	}
	if !accepted {
		log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)

		miss := q.headerPeerMiss[id]
		if miss == nil {
			q.headerPeerMiss[id] = make(map[uint64]struct{})
			miss = q.headerPeerMiss[id]
		}
		miss[request.From] = struct{}{}

		q.headerTaskQueue.Push(request.From, -int64(request.From))
		return 0, errors.New("delivery not accepted")
	}
	copy(q.headerResults[request.From-q.headerOffset:], headers)
	delete(q.headerTaskPool, request.From)

	ready := 0
	for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
		ready += MaxHeaderFetch
	}
	if ready > 0 {
		process := make([]*types.Header, ready)
		copy(process, q.headerResults[q.headerProced:q.headerProced+ready])

		select {
		case headerProcCh <- process:
			log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
			q.headerProced += len(process)
		default:
		}
	}
	if len(q.headerTaskPool) == 0 {
		q.headerContCh <- false
	}
	return len(headers), nil
}

大致逻辑和deliver类似,参数headers是收到的数据。先判断收到的消息是否符合我们要请求的。先从headerPendPool取出对应request,然后判断headers的长度是否为192,是的话验证headers中第一个是否是请求的起始位置,再验证最后一个是否符合要求,二者任何一个不合要求就将accepted置为false。接下来就是验证这一组head是否有关联,一个是序号上的一个是父hash是否对应。对于accepted为false时,在headerPeerMiss标记对应peer的相应head无效,并将相应再次放入headerTaskQueue中并返回错误。对于结果正确的情况,现将headers复制到headerResults,然后从headerTaskPool删除。接着计算已经准备好的head数ready,然后将其发送到headerProcCh触发downloader中processHeaders的相关逻辑。接着调整headerProced的值,它表示已经处理的head数量。

ExpireXxx

也是一组方法,用于在downloader的fetchParts使用,

func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
}

func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
}

func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
	q.lock.Lock()
	defer q.lock.Unlock()

	return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}

都是调用了expire方法

func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
	expiries := make(map[string]int)
	for id, request := range pendPool {
		if time.Since(request.Time) > timeout {
			timeoutMeter.Mark(1)
l
			if request.From > 0 {
				taskQueue.Push(request.From, -int64(request.From))
			}
			for _, header := range request.Headers {
				taskQueue.Push(header, -int64(header.Number.Uint64()))
			}
			expiries[id] = len(request.Headers)

			delete(pendPool, id)
		}
	}
	return expiries
}

主要是遍历pendPool,分别对应headerPendPool、blockPendPool和receiptPendPool,这些pool都保存着等待处理的request。遍历每个request看是否超时,对于超时的请求将其请求的head再次放入taskQueue等待后续处理,并按id将其请求的heads数存入expiries,然后将其从pendPool移除,最后返回expiries。

CancelXxx

也是一组方法,用于在downloader的fetchParts使用,

func (q *queue) CancelHeaders(request *fetchRequest) {
	q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}

func (q *queue) CancelBodies(request *fetchRequest) {
	q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}

func (q *queue) CancelReceipts(request *fetchRequest) {
	q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}

都调用了cancel方法

func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
	q.lock.Lock()
	defer q.lock.Unlock()

	if request.From > 0 {
		taskQueue.Push(request.From, -int64(request.From))
	}
	for _, header := range request.Headers {
		taskQueue.Push(header, -int64(header.Number.Uint64()))
	}
	delete(pendPool, request.Peer.id)
}

主要主要是取消某个request,但是请求的head还要重新放入taskQueue等待下次处理,最后将其从pendPool移除。

RetrieveHeaders

这个方法出现在fillHeaderSkeleton最后,他返回结果重置状态为下一次ScheduleSkeleton做准备

func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
	q.lock.Lock()
	defer q.lock.Unlock()

	headers, proced := q.headerResults, q.headerProced
	q.headerResults, q.headerProced = nil, 0

	return headers, proced
}

statesync.go

主要是用来同步状态信息。第一次被创建是在downloader的processFastSyncContent方法中,调用了syncState方法

func (d *Downloader) syncState(root common.Hash) *stateSync {
	s := newStateSync(d, root)
	select {
	case d.stateSyncStart <- s:
	case <-d.quitCh:
		s.err = errCancelStateFetch
		close(s.done)
	}
	return s
}

func newStateSync(d *Downloader, root common.Hash) *stateSync {
	return &stateSync{
		d:       d,
		sched:   state.NewStateSync(root, d.stateDB),
		keccak:  sha3.NewLegacyKeccak256(),
		tasks:   make(map[common.Hash]*stateTask),
		deliver: make(chan *stateReq),
		cancel:  make(chan struct{}),
		done:    make(chan struct{}),
	}
}

创建过程中有一个sched成员,是一个trie.Sync对象,构造过程如下

// go-ethereum\core\state\sync.go
func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.Sync {
	var syncer *trie.Sync
	callback := func(leaf []byte, parent common.Hash) error {
		var obj Account
		if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
			return err
		}
		syncer.AddSubTrie(obj.Root, 64, parent, nil)
		syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent)
		return nil
	}
	syncer = trie.NewSync(root, database, callback)
	return syncer
}

// go-ethereum\trie\sync.go
func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync {
	ts := &Sync{
		database: database,
		membatch: newSyncMemBatch(),
		requests: make(map[common.Hash]*request),
		queue:    prque.New(nil),
	}
	ts.AddSubTrie(root, 0, common.Hash{}, callback)
	return ts
}

最后调用trie的NewSync方法构造了一个Sync,Sync是用于同步状态树的。最后的AddSubTrie如下

func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
	if root == emptyRoot {
		return
	}
	if _, ok := s.membatch.batch[root]; ok {
		return
	}
	key := root.Bytes()
	blob, _ := s.database.Get(key)
	if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
		return
	}
	req := &request{
		hash:     root,
		depth:    depth,
		callback: callback,
	}
	if parent != (common.Hash{}) {
		ancestor := s.requests[parent]
		if ancestor == nil {
			panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
		}
		ancestor.deps++
		req.parents = append(req.parents, ancestor)
	}
	s.schedule(req)
}

在开始状态下构建了request,然后调用schedule

func (s *Sync) schedule(req *request) {
	if old, ok := s.requests[req.hash]; ok {
		old.parents = append(old.parents, req.parents...)
		return
	}
	s.queue.Push(req.hash, int64(req.depth))
	s.requests[req.hash] = req
}

也是通过将请求放入队列中进行调度工作。

回到syncState,sched创建后就向stateSyncStart发送了消息,内容就是StateSync对象。

stateFetcher

在downloader的New方法最后启动了两个goroutine方法,第二个就是stateFetcher方法。

func (d *Downloader) stateFetcher() {
	for {
		select {
		case s := <-d.stateSyncStart:
			for next := s; next != nil; {
				next = d.runStateSync(next)
			}
		case <-d.stateCh:
			// Ignore state responses while no sync is running.
		case <-d.quitCh:
			return
		}
	}
}

一个go-select结构,前面syncState中触发了这里的逻辑,调用了runStateSync方法

runStateSync

func (d *Downloader) runStateSync(s *stateSync) *stateSync {
	var (
		active   = make(map[string]*stateReq) // Currently in-flight requests
		finished []*stateReq                  // Completed or failed requests
		timeout  = make(chan *stateReq)       // Timed out active requests
	)
	defer func() {
		for _, req := range active {
			req.timer.Stop()
			req.peer.SetNodeDataIdle(len(req.items))
		}
	}()
	go s.run()
	defer s.Cancel()

	peerDrop := make(chan *peerConnection, 1024)
	peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
	defer peerSub.Unsubscribe()

	for {
		var (
			deliverReq   *stateReq
			deliverReqCh chan *stateReq
		)
		if len(finished) > 0 {
			deliverReq = finished[0]
			deliverReqCh = s.deliver
		}

		select {
		case next := <-d.stateSyncStart:
			return next

		case <-s.done:
			return nil

		case deliverReqCh <- deliverReq:
			copy(finished, finished[1:])
			finished[len(finished)-1] = nil
			finished = finished[:len(finished)-1]

		case pack := <-d.stateCh:
			req := active[pack.PeerId()]
			if req == nil {
				log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
				continue
			}
			req.timer.Stop()
			req.response = pack.(*statePack).states

			finished = append(finished, req)
			delete(active, pack.PeerId())

		case p := <-peerDrop:
			req := active[p.id]
			if req == nil {
				continue
			}
			req.timer.Stop()
			req.dropped = true

			finished = append(finished, req)
			delete(active, p.id)

		case req := <-timeout:
			if active[req.peer.id] != req {
				continue
			}
			finished = append(finished, req)
			delete(active, req.peer.id)

		case req := <-d.trackStateReq:
			if old := active[req.peer.id]; old != nil {
				log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)

				// Make sure the previous one doesn't get siletly lost
				old.timer.Stop()
				old.dropped = true

				finished = append(finished, old)
			}
			req.timer = time.AfterFunc(req.timeout, func() {
				select {
				case timeout <- req:
				case <-s.done:
				}
			})
			active[req.peer.id] = req
		}
	}
}

这里直接调用了run方法,是在一个单独的goroutine中,随后有一个for-select结构等待逻辑触发。run内容如下

func (s *stateSync) run() {
	s.err = s.loop()
	close(s.done)
}

func (s *stateSync) loop() (err error) {
	newPeer := make(chan *peerConnection, 1024)
	peerSub := s.d.peers.SubscribeNewPeers(newPeer)
	defer peerSub.Unsubscribe()
	defer func() {
		cerr := s.commit(true)
		if err == nil {
			err = cerr
		}
	}()

	for s.sched.Pending() > 0 {
		if err = s.commit(false); err != nil {
			return err
		}
		s.assignTasks()
		select {
		case <-newPeer:

		case <-s.cancel:
			return errCancelStateFetch

		case <-s.d.cancelCh:
			return errCancelStateFetch

		case req := <-s.deliver:
			log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
			if len(req.items) <= 2 && !req.dropped && req.timedOut() {
				log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
				s.d.dropPeer(req.peer.id)
			}
			delivered, err := s.process(req)
			if err != nil {
				log.Warn("Node data write error", "err", err)
				return err
			}
			req.peer.SetNodeDataIdle(delivered)
		}
	}
	return nil
}

先调用了sched的Pending方法,这个方法返回requests的长度,requests在schedule时被填充方法,如果大于0表示有等待调度的任务。之后进入循环,调用commit,最开始没有可以提交的,直接返回nil。接着调用assignTasks

func (s *stateSync) assignTasks() {
	peers, _ := s.d.peers.NodeDataIdlePeers()
	for _, p := range peers {
		cap := p.NodeDataCapacity(s.d.requestRTT())
		req := &stateReq{peer: p, timeout: s.d.requestTTL()}
		s.fillTasks(cap, req)

		if len(req.items) > 0 {
			req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
			select {
			case s.d.trackStateReq <- req:
				req.peer.FetchNodeData(req.items)
			case <-s.cancel:
			case <-s.d.cancelCh:
			}
		}
	}
}

func (s *stateSync) fillTasks(n int, req *stateReq) {
	if len(s.tasks) < n {
		new := s.sched.Missing(n - len(s.tasks))
		for _, hash := range new {
			s.tasks[hash] = &stateTask{make(map[string]struct{})}
		}
	}

	req.items = make([]common.Hash, 0, n)
	req.tasks = make(map[common.Hash]*stateTask, n)
	for hash, t := range s.tasks {
		if len(req.items) == n {
			break
		}
		if _, ok := t.attempts[req.peer.id]; ok {
			continue
		}
		t.attempts[req.peer.id] = struct{}{}
		req.items = append(req.items, hash)
		req.tasks[hash] = t
		delete(s.tasks, hash)
	}
}

NodeDataIdlePeers获取对应状态下空闲的peer,之后遍历所有idle的peer,然后构造请求,fillTasks中先调用了sched的Missing方法,这个方法将sched中schedule方法存入队列的请求取出。取出后将hash存入tasks,随后填充请求的items与tasks。一个请求构造完之后,在assignTasks的select结构中赋值给trackStateReq触发runStateSync里的逻辑。同时又调用了peer的FetchNodeData方法,FetchNodeData就是调用peer的RequestNodeData方法,发送GetNodeDataMsg方法。

发送GetNodeDataMsg消息后,对方会回复NodeDataMsg消息,这是调用downloader的DeliverNodeData方法。这个方法只是将接收到的数据发送到destCh,也就是d.stateCh,同样触发runStateSyn里逻辑。

由于刚才的run和loop都是再独立goroutine中运行的,所以runStateSync正常进入select结构阻塞。

首先触发trackStateReq逻辑,这个是追踪请求的,首先看该peer是否请求过,没有的话设置一个定时器,并将该请求按id存入active中,同时也制定了超时的逻辑。

其次触发的是d.stateCh,也是先检测是否是之前请求的,是的话停止定时器,然后填充请求的响应字段,然后向finished追加内容,一次循环结束,下一次循环开始时,取finished第一个内容,在select中触发deliverReqCh逻辑,由于deliverReqCh等于s.deliver,所以也触发loop中的逻辑,这里先调用process方法处理响应的内容。

func (s *stateSync) process(req *stateReq) (int, error) {
	duplicate, unexpected, successful := 0, 0, 0

	defer func(start time.Time) {
		if duplicate > 0 || unexpected > 0 {
			s.updateStats(0, duplicate, unexpected, time.Since(start))
		}
	}(time.Now())

	for _, blob := range req.response {
		_, hash, err := s.processNodeData(blob)
		switch err {
		case nil:
			s.numUncommitted++
			s.bytesUncommitted += len(blob)
			successful++
		case trie.ErrNotRequested:
			unexpected++
		case trie.ErrAlreadyProcessed:
			duplicate++
		default:
			return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
		}
		if _, ok := req.tasks[hash]; ok {
			delete(req.tasks, hash)
		}
	}
	npeers := s.d.peers.Len()
	for hash, task := range req.tasks {
		if len(req.response) > 0 || req.timedOut() {
			delete(task.attempts, req.peer.id)
		}
		if len(task.attempts) >= npeers {
			return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
		}
		s.tasks[hash] = task
	}
	return successful, nil
}

func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
	res := trie.SyncResult{Data: blob}
	s.keccak.Reset()
	s.keccak.Write(blob)
	s.keccak.Sum(res.Hash[:0])
	committed, _, err := s.sched.Process([]trie.SyncResult{res})
	return committed, res.Hash, err
}

由于响应可能是多组数据,所以遍历每一组数据,调用processNodeData进行处理,processNodeData主要是构建SyncResult以便sched处理,sched的process方法主要就是将数据解析为树中节点,然后在递归的去请求节点的孩子,直到一棵树建立,请求的方法还是构建请求放入requests等待调度,这里不再详细说明。总之经过processNodeData一次响应的数据被处理,如果没有错误,numUncommitted和bytesUncommitted对应增加,然后将成功的请求从tasks中移除,未成功的放回队列等待下次请求。回到loop中对于逻辑,处理完之后,没有错的话修改对应peer状态信息。

题图来自unsplash:https://unsplash.com/photos/3Xj6BntfsIU