go-ethereum中eth-downloader源码学习(二)
本文是downloader的源码分析的第二部分,分析peer、queue和statesync这三个辅助部分
peer.go
peer表示了一个节点,封装了一系列有用的方法
type Peer interface {
LightPeer
RequestBodies([]common.Hash) error
RequestReceipts([]common.Hash) error
RequestNodeData([]common.Hash) error
}
type LightPeer interface {
Head() (common.Hash, *big.Int)
RequestHeadersByHash(common.Hash, int, int, bool) error
RequestHeadersByNumber(uint64, int, int, bool) error
}
register & Unregister
实际使用中,当一个连接建立后,使用pm的handle去处理一个peer时会用RegisterPeer的RegisterPeer方法去注册一个peer,RegisterPeer方法如下
func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
logger := log.New("peer", id)
logger.Trace("Registering sync peer")
if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
logger.Error("Failed to register sync peer", "err", err)
return err
}
d.qosReduceConfidence()
return nil
}
可见实际上是向peers中注册,peers是一个peerSet对象,它的注册的是peerConnection对象
type peerSet struct {
peers map[string]*peerConnection
newPeerFeed event.Feed
peerDropFeed event.Feed
lock sync.RWMutex
}
type peerConnection struct {
id string // Unique identifier of the peer
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1)
headerThroughput float64 // Number of headers measured to be retrievable per second
blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second
receiptThroughput float64 // Number of receipts measured to be retrievable per second
stateThroughput float64 // Number of node data pieces measured to be retrievable per second
rtt time.Duration // Request round trip time to track responsiveness (QoS)
headerStarted time.Time // Time instance when the last header fetch was started
blockStarted time.Time // Time instance when the last block (body) fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
peer Peer
version int // Eth protocol version number to switch strategies
log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex
}
peerSet的Register方法如下
func (ps *peerSet) Register(p *peerConnection) error {
p.rtt = ps.medianRTT()
ps.lock.Lock()
if _, ok := ps.peers[p.id]; ok {
ps.lock.Unlock()
return errAlreadyRegistered
}
if len(ps.peers) > 0 {
p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0
for _, peer := range ps.peers {
peer.lock.RLock()
p.headerThroughput += peer.headerThroughput
p.blockThroughput += peer.blockThroughput
p.receiptThroughput += peer.receiptThroughput
p.stateThroughput += peer.stateThroughput
peer.lock.RUnlock()
}
p.headerThroughput /= float64(len(ps.peers))
p.blockThroughput /= float64(len(ps.peers))
p.receiptThroughput /= float64(len(ps.peers))
p.stateThroughput /= float64(len(ps.peers))
}
ps.peers[p.id] = p
ps.lock.Unlock()
ps.newPeerFeed.Send(p)
return nil
}
medianRTT是取ps存储的所有peer的rtt的中位数,RTT即round-trip time,往返时间,可以评估和一个peer的通信质量。medianRTT实现如下:
func (ps *peerSet) medianRTT() time.Duration {
ps.lock.RLock()
defer ps.lock.RUnlock()
rtts := make([]float64, 0, len(ps.peers))
for _, p := range ps.peers {
p.lock.RLock()
rtts = append(rtts, float64(p.rtt))
p.lock.RUnlock()
}
sort.Float64s(rtts)
median := rttMaxEstimate
if qosTuningPeers <= len(rtts) {
median = time.Duration(rtts[qosTuningPeers/2])
} else if len(rtts) > 0 {
median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos)
}
if median < rttMinEstimate {
median = rttMinEstimate
}
if median > rttMaxEstimate {
median = rttMaxEstimate
}
return median
}
逻辑很简单,首先取出所有peer的rtt,然后排序,如果序列长度大于5,就只取前5个的中位数,否则就是序列的中位数,获得的时间最后控制在2秒到20秒之间。
回到Register中,先看是否注册过,若没有再判断是否注册过其他peer,则计算headerThroughput、blockThroughput、receiptThroughput和stateThroughput四个值,分别代表:每秒可检索的区块头数量,每秒可检索的区块体数量,每秒可检索的收据数量和每秒可检测的节点数据数量。然后将peer按照其id放入peers中。
有注册就有反注册,Unregister实现如下
func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock()
p, ok := ps.peers[id]
if !ok {
defer ps.lock.Unlock()
return errNotRegistered
}
delete(ps.peers, id)
ps.lock.Unlock()
ps.peerDropFeed.Send(p)
return nil
}
很简单就是先判断在删除。
FetchXxx
peerConnection除了封装一个peer,还有一系列的peer相关的信息,除了前面提到过的headerThroughput、blockThroughput、receiptThroughput和stateThroughput,还有headerIdle、blockIdle、receiptIdle和stateIdle分别用来记录peer对应的工作状态,这几个变量为0时表示对应状态空闲,为1时表示对应状态繁忙。除此之外还有headerStarted、blockStarted、receiptStarted和stateStarted几个时间变量记录对应工作开始的时间。
以head为例介绍一下相关变量的改变,在peerConnection有一个FetchHeaders表示请求区块头,实现如下
func (p *peerConnection) FetchHeaders(from uint64, count int) error {
if p.version < 62 {
panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
}
if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) {
return errAlreadyFetching
}
p.headerStarted = time.Now()
go p.peer.RequestHeadersByNumber(from, count, 0, false)
return nil
}
这里先判断了eth协议的版本号,然后将headerIdle置为1,并记录开始时间到headerStarted,真正的请求逻辑在peer的RequestHeadersByNumber方法。这里的peer是ProtocolManager的newPeer方法创建的peer(go-ethereum\eth\peer.go)。
同样还有FetchBodies、FetchReceipts和FetchNodeData等方法。
SetXxxIdle
前面的FetchXxx是将对应状态置为忙碌,这里SetXxxIdle则是相反,他们都调用的是setIdle方法。
func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
defer atomic.StoreInt32(idle, 0)
p.lock.Lock()
defer p.lock.Unlock()
if delivered == 0 {
*throughput = 0
return
}
elapsed := time.Since(started) + 1
measured := float64(delivered) / (float64(elapsed) / float64(time.Second))
*throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured
p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed))
p.log.Trace("Peer throughput measurements updated",
"hps", p.headerThroughput, "bps", p.blockThroughput,
"rps", p.receiptThroughput, "sps", p.stateThroughput,
"miss", len(p.lacking), "rtt", p.rtt)
}
首先置0,表示空闲,然后如果delivered为0,表示没有传送到,则该peer的对应throughput置0.否则更新throughput和rtt。
XxxCapacity
返回的对应状态的吞吐量,以heads为例
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock()
defer p.lock.RUnlock()
return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch)))
}
XxxIdlePeers
用来获取对应状态空闲的peer
func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0
}
throughput := func(p *peerConnection) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.headerThroughput
}
return ps.idlePeers(62, 64, idle, throughput)
}
调用的是idlePeers
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
ps.lock.RLock()
defer ps.lock.RUnlock()
idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) {
idle = append(idle, p)
}
total++
}
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
if throughput(idle[i]) < throughput(idle[j]) {
idle[i], idle[j] = idle[j], idle[i]
}
}
}
return idle, total
}
就是简单的遍历,然后将结果按throughput排序。
Reset
重置所有变量
func (p *peerConnection) Reset() {
p.lock.Lock()
defer p.lock.Unlock()
atomic.StoreInt32(&p.headerIdle, 0)
atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.stateIdle, 0)
p.headerThroughput = 0
p.blockThroughput = 0
p.receiptThroughput = 0
p.stateThroughput = 0
p.lacking = make(map[common.Hash]struct{})
}
queue.go
这实际上起到一个调度的作用,配合downloader的fetchParts方法,它用来告诉downloader哪些可以去处理。先看构造方法,在downloader创建时调用了newQueue方法创建了一个queue
func newQueue() *queue {
lock := new(sync.Mutex)
return &queue{
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
active: sync.NewCond(lock),
lock: lock,
}
}
其中的active变量是一个Cond类型的锁,它有一个锁L可以正常加锁解锁,除此之外还有wait进入阻塞,Signal和Broadcast可以进行唤醒。receiptTaskQueue与blockTaskQueue是两个优先级队列。
Schedule
func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
q.lock.Lock()
defer q.lock.Unlock()
inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers {
hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from {
log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from)
break
}
if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash {
log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
break
}
if _, ok := q.blockTaskPool[hash]; ok {
log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash)
continue
}
if _, ok := q.receiptTaskPool[hash]; ok {
log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash)
continue
}
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
from++
}
return inserts
}
这个方法在downloader的processHeaders方法中出现。传入的参数headers是一组head,from是开始的位置。这个方法的主要作用是申请对一些区块头进行下载的调度。首先创建了一个Header数组,之后遍历了所有传入的head,对每个head进行一系列检查。
首先检测head格式是否正确,即编号是否为空,且是否为正确的编号。在检测区块间是否为父子关系,即检测父区块哈希是否正确且对应。然后在检测是否已经位于blockTaskPool和receiptTaskPool中,是的话直接跳过。
检测完毕后,将head放入blockTaskPool,这个map存储着等待检索区块体的任务。之后再放入blockTaskQueue这个队列,并以区块编号的负数作为优先级,编号越小的区块优先级越高。再接着对于fast模式下,再向receiptTaskPool和receiptTaskQueue添加内容,这也就显示出fast模式的特性。之后将head放入insert中,然后更新headerHead与from,用于下一个区块验证。整个方法的返回值就是要等待请求的区块头集合。
其中的两个TaskPool主要作用是避免重复申请调度,会在请求结束后在DeliverXxx方法中被删除。而这两个TaskQueue则是调度的关键,存储着等待调度的东西,在后面的ReserveXxx方法中会被拿出来构造请求进行处理。
ScheduleSkeleton
这个方法出现在downloader的fillHeaderSkeleton方法中,就是在fetchHeaders中对于骨架模式进行填充
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.lock.Lock()
defer q.lock.Unlock()
if q.headerResults != nil {
panic("skeleton assembly already in progress")
}
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
for i, header := range skeleton {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -int64(index))
}
}
首先避免重复框架组装,通过检测headerResults是否为空来判断。之后遍历skeleton,skeleton是一组断续的header,他从from+191位置开始,每隔192个区块请求一个,总共请求128个。遍历的时候用headerTaskPool记录框架对于位置的head,并将index按照其负数作为优先级存入headerTaskQueue队列。headerTaskPool和headerTaskQueue的作用和前面Schedule方法中那两对变量作用类似。
ReserveXxx
这是一组方法,在downloader的Synchronise流程最后定义了一组fetch方法,最后都调用了fetchParts方法,其中reserve参数就是这里对应的ReserveXxx方法,这组方法的主要用途是从TaskQueue队列中领取任务构造请求便于后续执行。
一共有三个方法:ReserveReceipts、ReserveBodies和ReserveHeaders,但是ReserveReceipts和ReserveBodies都调用了reserveHeaders方法
func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop)
}
func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash
}
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop)
}
reserveHeaders
这里看一下这个方法
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
if taskQueue.Empty() {
return nil, false, nil
}
if _, ok := pendPool[p.id]; ok {
return nil, false, nil
}
space := q.resultSlots(pendPool, donePool)
send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0)
progress := false
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := taskQueue.PopItem().(*types.Header)
hash := header.Hash()
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
components := 1
if q.mode == FastSync {
components = 2
}
q.resultCache[index] = &fetchResult{
Pending: components,
Hash: hash,
Header: header,
}
}
if isNoop(header) {
donePool[hash] = struct{}{}
delete(taskPool, hash)
space, proc = space-1, proc-1
q.resultCache[index].Pending--
progress = true
continue
}
if p.Lacks(hash) {
skip = append(skip, header)
} else {
send = append(send, header)
}
}
for _, header := range skip {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
q.active.Signal()
}
if len(send) == 0 {
return nil, progress, nil
}
request := &fetchRequest{
Peer: p,
Headers: send,
Time: time.Now(),
}
pendPool[p.id] = request
return request, progress, nil
}
首先检查taskQueue是否为空,taskQueue是作为参数传入的,对应ReserveBodies和ReserveReceipts分别是blockTaskQueue和receiptTaskQueue,这两个队列分别在前面Schedule方法中被填充。接着计算了fetch的上限,创建了两个数组send和skip。
然后开始循环,每次从taskQueue获取优先级最高的item,然后根据区块头的高度计算index,index表示在resultCache的位置,resultCache是一个fetchResult数组,存储已经下载完毕的结果。如果resultCache对应位置为空,则构造一个fetchResult对象存入。然后调用了isNoop方法,这是方法的一个参数,用于检测是否为包含交易。如果是的话表示不需要继续操作,将其donePool对应元素置空,然后从taskPool中删除,由于taskPool被改变,则space和proc两个计数器响应的都减一。如果不为空,检测hash是否在lacking中,如在表示没有这个hash相关数据,则放入skip中,否则放入send中。
循环结束后,也就是从taskPool中分类一定数量的head后,遍历skip,将其中改的head按期高度的负数再次添加到对应taskQueue队列中。然后如果progress为true,表示刚才循环中出现没有交易的区块头,则调用active的Signal方法,唤醒wait,他在Result方法中阻塞,稍后介绍。之后检测send数组,如果空则返回,否则构建一个fetchRequest,然后按照peer的id放入pendPool。
ReserveHeaders
这个方法和前面的ScheduleSkeleton一样只有在骨架模式下才回调用,该方法实现如下
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
if _, ok := q.headerPendPool[p.id]; ok {
return nil
}
send, skip := uint64(0), []uint64{}
for send == 0 && !q.headerTaskQueue.Empty() {
from, _ := q.headerTaskQueue.Pop()
if q.headerPeerMiss[p.id] != nil {
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
skip = append(skip, from.(uint64))
continue
}
}
send = from.(uint64)
}
for _, from := range skip {
q.headerTaskQueue.Push(from, -int64(from))
}
if send == 0 {
return nil
}
request := &fetchRequest{
Peer: p,
From: send,
Time: time.Now(),
}
q.headerPendPool[p.id] = request
return request
}
逻辑和reserveHeaders类似,首先检测peer是否已在headerPendPool中。然后遍历headerTaskQueue,每次取优先级最高的,在检测对应hash是否在headerPeerMiss中,headerPeerMiss记录了每个peer明确不可用的区块,是的话放入skip以跳过,这个循环直到取到一个有效的head位置,然后记录from值到send。后续的逻辑就和reserveHeaders差不多,对于skip的再次放入headerTaskQueue,然后构造一个fetchRequest放入headerPendPool中。注意这个fetchRequest和前面的reserveHeaders最后构造的不太一样,这里没有传入Heads字段,而是写入了From字段,表示骨架中第一个有效的head,由于骨架是有规律的记录起始位置即可。
DeliverXxx
这也是一组方法,和ReserveXxx类似,都是在调用fetchParts时作为参数传入的,这组方法的作用是在数据下载完毕后被调用。有DeliverBodies、DeliverReceipts和DeliverReceipts三个,前两个都是直接调用了deliver方法
func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
return errInvalidBody
}
result.Transactions = txLists[index]
result.Uncles = uncleLists[index]
return nil
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
}
func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
return errInvalidReceipt
}
result.Receipts = receiptList[index]
return nil
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}
deliver
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer,
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {
request := pendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
if results == 0 {
for _, header := range request.Headers {
request.Peer.MarkLacking(header.Hash())
}
}
var (
accepted int
failure error
useful bool
)
for i, header := range request.Headers {
if i >= results {
break
}
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
failure = errInvalidChain
break
}
if err := reconstruct(header, i, q.resultCache[index]); err != nil {
failure = err
break
}
hash := header.Hash()
donePool[hash] = struct{}{}
q.resultCache[index].Pending--
useful = true
accepted++
request.Headers[i] = nil
delete(taskPool, hash)
}
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
if accepted > 0 {
q.active.Signal()
}
switch {
case failure == nil || failure == errInvalidChain:
return accepted, failure
case useful:
return accepted, fmt.Errorf("partial failure: %v", failure)
default:
return accepted, errStaleDelivery
}
}
首先检测相应id是否在pendPool,pendPool是参数,对应DeliverBodies和DeliverReceipts分别是blockPendPool和receiptPendPool,他们分别在对应的ReserveXxx中被填充。如果没有则返回错误,有的话先从pendPool将其删除,表示已经结束。之后检查result的值,他表示DeliverXxx传入的List的长度,这个list表示检索到的数据集合,如果为0表示相应的head在该peer取不到,则调用MarkLacking将其添加到lacking中。
接下来进入一个循环,变量最开始从pendPool取出的request的Headers,然后计算在resultCache的index,之后利用reconstruct构建,reconstruct是传入的参数,主要作用是补全fetchResult,如在DeliverReceipts中是补全Receipts字段,在DeliverBodies补全Transactions和Uncles字段。之后在donePool标记对应hash的任务已经完成,并且useful写为true,accepted自增一,清空request对应位置的数据,并且删除taskPool对应hash内容。
循环结束后,由于刚才循环最后遍历results个,可能有剩余,将剩余的head放入对应taskQueue中。之后如果accept大于一,表示有任务真正完成了,同样调用active的Signal唤醒阻塞,和前面reserveHeaders类似。最后返回错误报告和accepted数量。
DeliverHeaders
func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
request := q.headerPendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
headerReqTimer.UpdateSince(request.Time)
delete(q.headerPendPool, id)
target := q.headerTaskPool[request.From].Hash()
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash(), request.From)
accepted = false
} else if headers[len(headers)-1].Hash() != target {
log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash(), "expected", target)
accepted = false
}
}
if accepted {
for i, header := range headers[1:] {
hash := header.Hash()
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash, "expected", want)
accepted = false
break
}
if headers[i].Hash() != header.ParentHash {
log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash)
accepted = false
break
}
}
}
if !accepted {
log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From)
miss := q.headerPeerMiss[id]
if miss == nil {
q.headerPeerMiss[id] = make(map[uint64]struct{})
miss = q.headerPeerMiss[id]
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
copy(q.headerResults[request.From-q.headerOffset:], headers)
delete(q.headerTaskPool, request.From)
ready := 0
for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
ready += MaxHeaderFetch
}
if ready > 0 {
process := make([]*types.Header, ready)
copy(process, q.headerResults[q.headerProced:q.headerProced+ready])
select {
case headerProcCh <- process:
log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number)
q.headerProced += len(process)
default:
}
}
if len(q.headerTaskPool) == 0 {
q.headerContCh <- false
}
return len(headers), nil
}
大致逻辑和deliver类似,参数headers是收到的数据。先判断收到的消息是否符合我们要请求的。先从headerPendPool取出对应request,然后判断headers的长度是否为192,是的话验证headers中第一个是否是请求的起始位置,再验证最后一个是否符合要求,二者任何一个不合要求就将accepted置为false。接下来就是验证这一组head是否有关联,一个是序号上的一个是父hash是否对应。对于accepted为false时,在headerPeerMiss标记对应peer的相应head无效,并将相应再次放入headerTaskQueue中并返回错误。对于结果正确的情况,现将headers复制到headerResults,然后从headerTaskPool删除。接着计算已经准备好的head数ready,然后将其发送到headerProcCh触发downloader中processHeaders的相关逻辑。接着调整headerProced的值,它表示已经处理的head数量。
ExpireXxx
也是一组方法,用于在downloader的fetchParts使用,
func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
}
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter)
}
func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter)
}
都是调用了expire方法
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
expiries := make(map[string]int)
for id, request := range pendPool {
if time.Since(request.Time) > timeout {
timeoutMeter.Mark(1)
l
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
expiries[id] = len(request.Headers)
delete(pendPool, id)
}
}
return expiries
}
主要是遍历pendPool,分别对应headerPendPool、blockPendPool和receiptPendPool,这些pool都保存着等待处理的request。遍历每个request看是否超时,对于超时的请求将其请求的head再次放入taskQueue等待后续处理,并按id将其请求的heads数存入expiries,然后将其从pendPool移除,最后返回expiries。
CancelXxx
也是一组方法,用于在downloader的fetchParts使用,
func (q *queue) CancelHeaders(request *fetchRequest) {
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}
func (q *queue) CancelBodies(request *fetchRequest) {
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
都调用了cancel方法
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
主要主要是取消某个request,但是请求的head还要重新放入taskQueue等待下次处理,最后将其从pendPool移除。
RetrieveHeaders
这个方法出现在fillHeaderSkeleton最后,他返回结果重置状态为下一次ScheduleSkeleton做准备
func (q *queue) RetrieveHeaders() ([]*types.Header, int) {
q.lock.Lock()
defer q.lock.Unlock()
headers, proced := q.headerResults, q.headerProced
q.headerResults, q.headerProced = nil, 0
return headers, proced
}
statesync.go
主要是用来同步状态信息。第一次被创建是在downloader的processFastSyncContent方法中,调用了syncState方法
func (d *Downloader) syncState(root common.Hash) *stateSync {
s := newStateSync(d, root)
select {
case d.stateSyncStart <- s:
case <-d.quitCh:
s.err = errCancelStateFetch
close(s.done)
}
return s
}
func newStateSync(d *Downloader, root common.Hash) *stateSync {
return &stateSync{
d: d,
sched: state.NewStateSync(root, d.stateDB),
keccak: sha3.NewLegacyKeccak256(),
tasks: make(map[common.Hash]*stateTask),
deliver: make(chan *stateReq),
cancel: make(chan struct{}),
done: make(chan struct{}),
}
}
创建过程中有一个sched成员,是一个trie.Sync对象,构造过程如下
// go-ethereum\core\state\sync.go
func NewStateSync(root common.Hash, database trie.DatabaseReader) *trie.Sync {
var syncer *trie.Sync
callback := func(leaf []byte, parent common.Hash) error {
var obj Account
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, 64, parent, nil)
syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent)
return nil
}
syncer = trie.NewSync(root, database, callback)
return syncer
}
// go-ethereum\trie\sync.go
func NewSync(root common.Hash, database DatabaseReader, callback LeafCallback) *Sync {
ts := &Sync{
database: database,
membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(nil),
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)
return ts
}
最后调用trie的NewSync方法构造了一个Sync,Sync是用于同步状态树的。最后的AddSubTrie如下
func (s *Sync) AddSubTrie(root common.Hash, depth int, parent common.Hash, callback LeafCallback) {
if root == emptyRoot {
return
}
if _, ok := s.membatch.batch[root]; ok {
return
}
key := root.Bytes()
blob, _ := s.database.Get(key)
if local, err := decodeNode(key, blob, 0); local != nil && err == nil {
return
}
req := &request{
hash: root,
depth: depth,
callback: callback,
}
if parent != (common.Hash{}) {
ancestor := s.requests[parent]
if ancestor == nil {
panic(fmt.Sprintf("sub-trie ancestor not found: %x", parent))
}
ancestor.deps++
req.parents = append(req.parents, ancestor)
}
s.schedule(req)
}
在开始状态下构建了request,然后调用schedule
func (s *Sync) schedule(req *request) {
if old, ok := s.requests[req.hash]; ok {
old.parents = append(old.parents, req.parents...)
return
}
s.queue.Push(req.hash, int64(req.depth))
s.requests[req.hash] = req
}
也是通过将请求放入队列中进行调度工作。
回到syncState,sched创建后就向stateSyncStart发送了消息,内容就是StateSync对象。
stateFetcher
在downloader的New方法最后启动了两个goroutine方法,第二个就是stateFetcher方法。
func (d *Downloader) stateFetcher() {
for {
select {
case s := <-d.stateSyncStart:
for next := s; next != nil; {
next = d.runStateSync(next)
}
case <-d.stateCh:
// Ignore state responses while no sync is running.
case <-d.quitCh:
return
}
}
}
一个go-select结构,前面syncState中触发了这里的逻辑,调用了runStateSync方法
runStateSync
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
var (
active = make(map[string]*stateReq) // Currently in-flight requests
finished []*stateReq // Completed or failed requests
timeout = make(chan *stateReq) // Timed out active requests
)
defer func() {
for _, req := range active {
req.timer.Stop()
req.peer.SetNodeDataIdle(len(req.items))
}
}()
go s.run()
defer s.Cancel()
peerDrop := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
defer peerSub.Unsubscribe()
for {
var (
deliverReq *stateReq
deliverReqCh chan *stateReq
)
if len(finished) > 0 {
deliverReq = finished[0]
deliverReqCh = s.deliver
}
select {
case next := <-d.stateSyncStart:
return next
case <-s.done:
return nil
case deliverReqCh <- deliverReq:
copy(finished, finished[1:])
finished[len(finished)-1] = nil
finished = finished[:len(finished)-1]
case pack := <-d.stateCh:
req := active[pack.PeerId()]
if req == nil {
log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
continue
}
req.timer.Stop()
req.response = pack.(*statePack).states
finished = append(finished, req)
delete(active, pack.PeerId())
case p := <-peerDrop:
req := active[p.id]
if req == nil {
continue
}
req.timer.Stop()
req.dropped = true
finished = append(finished, req)
delete(active, p.id)
case req := <-timeout:
if active[req.peer.id] != req {
continue
}
finished = append(finished, req)
delete(active, req.peer.id)
case req := <-d.trackStateReq:
if old := active[req.peer.id]; old != nil {
log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
// Make sure the previous one doesn't get siletly lost
old.timer.Stop()
old.dropped = true
finished = append(finished, old)
}
req.timer = time.AfterFunc(req.timeout, func() {
select {
case timeout <- req:
case <-s.done:
}
})
active[req.peer.id] = req
}
}
}
这里直接调用了run方法,是在一个单独的goroutine中,随后有一个for-select结构等待逻辑触发。run内容如下
func (s *stateSync) run() {
s.err = s.loop()
close(s.done)
}
func (s *stateSync) loop() (err error) {
newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
defer peerSub.Unsubscribe()
defer func() {
cerr := s.commit(true)
if err == nil {
err = cerr
}
}()
for s.sched.Pending() > 0 {
if err = s.commit(false); err != nil {
return err
}
s.assignTasks()
select {
case <-newPeer:
case <-s.cancel:
return errCancelStateFetch
case <-s.d.cancelCh:
return errCancelStateFetch
case req := <-s.deliver:
log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut())
if len(req.items) <= 2 && !req.dropped && req.timedOut() {
log.Warn("Stalling state sync, dropping peer", "peer", req.peer.id)
s.d.dropPeer(req.peer.id)
}
delivered, err := s.process(req)
if err != nil {
log.Warn("Node data write error", "err", err)
return err
}
req.peer.SetNodeDataIdle(delivered)
}
}
return nil
}
先调用了sched的Pending方法,这个方法返回requests的长度,requests在schedule时被填充方法,如果大于0表示有等待调度的任务。之后进入循环,调用commit,最开始没有可以提交的,直接返回nil。接着调用assignTasks
func (s *stateSync) assignTasks() {
peers, _ := s.d.peers.NodeDataIdlePeers()
for _, p := range peers {
cap := p.NodeDataCapacity(s.d.requestRTT())
req := &stateReq{peer: p, timeout: s.d.requestTTL()}
s.fillTasks(cap, req)
if len(req.items) > 0 {
req.peer.log.Trace("Requesting new batch of data", "type", "state", "count", len(req.items))
select {
case s.d.trackStateReq <- req:
req.peer.FetchNodeData(req.items)
case <-s.cancel:
case <-s.d.cancelCh:
}
}
}
}
func (s *stateSync) fillTasks(n int, req *stateReq) {
if len(s.tasks) < n {
new := s.sched.Missing(n - len(s.tasks))
for _, hash := range new {
s.tasks[hash] = &stateTask{make(map[string]struct{})}
}
}
req.items = make([]common.Hash, 0, n)
req.tasks = make(map[common.Hash]*stateTask, n)
for hash, t := range s.tasks {
if len(req.items) == n {
break
}
if _, ok := t.attempts[req.peer.id]; ok {
continue
}
t.attempts[req.peer.id] = struct{}{}
req.items = append(req.items, hash)
req.tasks[hash] = t
delete(s.tasks, hash)
}
}
NodeDataIdlePeers获取对应状态下空闲的peer,之后遍历所有idle的peer,然后构造请求,fillTasks中先调用了sched的Missing方法,这个方法将sched中schedule方法存入队列的请求取出。取出后将hash存入tasks,随后填充请求的items与tasks。一个请求构造完之后,在assignTasks的select结构中赋值给trackStateReq触发runStateSync里的逻辑。同时又调用了peer的FetchNodeData方法,FetchNodeData就是调用peer的RequestNodeData方法,发送GetNodeDataMsg方法。
发送GetNodeDataMsg消息后,对方会回复NodeDataMsg消息,这是调用downloader的DeliverNodeData方法。这个方法只是将接收到的数据发送到destCh,也就是d.stateCh,同样触发runStateSyn里逻辑。
由于刚才的run和loop都是再独立goroutine中运行的,所以runStateSync正常进入select结构阻塞。
首先触发trackStateReq逻辑,这个是追踪请求的,首先看该peer是否请求过,没有的话设置一个定时器,并将该请求按id存入active中,同时也制定了超时的逻辑。
其次触发的是d.stateCh,也是先检测是否是之前请求的,是的话停止定时器,然后填充请求的响应字段,然后向finished追加内容,一次循环结束,下一次循环开始时,取finished第一个内容,在select中触发deliverReqCh逻辑,由于deliverReqCh等于s.deliver,所以也触发loop中的逻辑,这里先调用process方法处理响应的内容。
func (s *stateSync) process(req *stateReq) (int, error) {
duplicate, unexpected, successful := 0, 0, 0
defer func(start time.Time) {
if duplicate > 0 || unexpected > 0 {
s.updateStats(0, duplicate, unexpected, time.Since(start))
}
}(time.Now())
for _, blob := range req.response {
_, hash, err := s.processNodeData(blob)
switch err {
case nil:
s.numUncommitted++
s.bytesUncommitted += len(blob)
successful++
case trie.ErrNotRequested:
unexpected++
case trie.ErrAlreadyProcessed:
duplicate++
default:
return successful, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err)
}
if _, ok := req.tasks[hash]; ok {
delete(req.tasks, hash)
}
}
npeers := s.d.peers.Len()
for hash, task := range req.tasks {
if len(req.response) > 0 || req.timedOut() {
delete(task.attempts, req.peer.id)
}
if len(task.attempts) >= npeers {
return successful, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers)
}
s.tasks[hash] = task
}
return successful, nil
}
func (s *stateSync) processNodeData(blob []byte) (bool, common.Hash, error) {
res := trie.SyncResult{Data: blob}
s.keccak.Reset()
s.keccak.Write(blob)
s.keccak.Sum(res.Hash[:0])
committed, _, err := s.sched.Process([]trie.SyncResult{res})
return committed, res.Hash, err
}
由于响应可能是多组数据,所以遍历每一组数据,调用processNodeData进行处理,processNodeData主要是构建SyncResult以便sched处理,sched的process方法主要就是将数据解析为树中节点,然后在递归的去请求节点的孩子,直到一棵树建立,请求的方法还是构建请求放入requests等待调度,这里不再详细说明。总之经过processNodeData一次响应的数据被处理,如果没有错误,numUncommitted和bytesUncommitted对应增加,然后将成功的请求从tasks中移除,未成功的放回队列等待下次请求。回到loop中对于逻辑,处理完之后,没有错的话修改对应peer状态信息。
题图来自unsplash:https://unsplash.com/photos/3Xj6BntfsIU