go-ethereum中eth-Fetcher源码学习

数据结构

fetcher负责收集来自各个对等点的块通知并进行处理。前面在介绍ProtocolManager时就出现过,我们先看数据结构

type announce struct {
	hash   common.Hash   // Hash of the block being announced
	number uint64        // Number of the block being announced (0 = unknown | old protocol)
	header *types.Header // Header of the block partially reassembled (new protocol)
	time   time.Time     // Timestamp of the announcement

	origin string // Identifier of the peer originating the notification

	fetchHeader headerRequesterFn // Fetcher function to retrieve the header of an announced block
	fetchBodies bodyRequesterFn   // Fetcher function to retrieve the body of an announced block
}

announce是一个通知,表示网络中有新块出现。

type inject struct {
	origin string
	block  *types.Block
}

inject表示要插入一个区块。

type Fetcher struct {
	// Various event channels
	notify chan *announce
	inject chan *inject

	headerFilter chan chan *headerFilterTask
	bodyFilter   chan chan *bodyFilterTask

	done chan common.Hash
	quit chan struct{}

	// Announce states
	announces  map[string]int              // Per peer announce counts to prevent memory exhaustion
	announced  map[common.Hash][]*announce // Announced blocks, scheduled for fetching
	fetching   map[common.Hash]*announce   // Announced blocks, currently fetching
	fetched    map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
	completing map[common.Hash]*announce   // Blocks with headers, currently body-completing

	// Block cache
	queue  *prque.Prque            // Queue containing the import operations (block number sorted)
	queues map[string]int          // Per peer block counts to prevent memory exhaustion
	queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)

	// Callbacks
	getBlock       blockRetrievalFn   // Retrieves a block from the local chain
	verifyHeader   headerVerifierFn   // Checks if a block's headers have a valid proof of work
	broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
	chainHeight    chainHeightFn      // Retrieves the current chain's height
	insertChain    chainInsertFn      // Injects a batch of blocks into the chain
	dropPeer       peerDropFn         // Drops a peer for misbehaving

	// Testing hooks
	announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
	queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
	fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
	completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
	importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
}

New

在创建ProtocolManager时使用了NewProtocolManager方法,在最后使用fetcher的new方法创建了一个fetcher:

manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)

func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
	return &Fetcher{
		notify:         make(chan *announce),
		inject:         make(chan *inject),
		headerFilter:   make(chan chan *headerFilterTask),
		bodyFilter:     make(chan chan *bodyFilterTask),
		done:           make(chan common.Hash),
		quit:           make(chan struct{}),
		announces:      make(map[string]int),
		announced:      make(map[common.Hash][]*announce),
		fetching:       make(map[common.Hash]*announce),
		fetched:        make(map[common.Hash][]*announce),
		completing:     make(map[common.Hash]*announce),
		queue:          prque.New(nil),
		queues:         make(map[string]int),
		queued:         make(map[common.Hash]*inject),
		getBlock:       getBlock,
		verifyHeader:   verifyHeader,
		broadcastBlock: broadcastBlock,
		chainHeight:    chainHeight,
		insertChain:    insertChain,
		dropPeer:       dropPeer,
	}
}

new方法并没有什么实际逻辑,只是创建了一个对象,只不过传入了几个方法:getBlock表示根据hash值获取对应区块,verifyHeader用来验证区块头,broadcastBlock表示给peer广播区块,chainHeight用来获取区块链高度,insertChain用来插入区块,dropPeer用来删除一个peer。

上面几个方法中getBlock、chainHeight和insertChain都是BlockChain的方法,verifyHeader是共识引擎的方法,broadcastBlock和dropPeer都是ProtocolManager的方法。

Start & loop

在ProtocolManager调用Start方法启动后,会调用syncer方法,此时启动fetcher,使用的是Start方法:

func (f *Fetcher) Start() {
	go f.loop()
}

可见只是启动了一个goroutine去执行loop

func (f *Fetcher) loop() {
	fetchTimer := time.NewTimer(0)
	completeTimer := time.NewTimer(0)

	for {
		for hash, announce := range f.fetching {
			if time.Since(announce.time) > fetchTimeout {
				f.forgetHash(hash)
			}
		}

		height := f.chainHeight()
		for !f.queue.Empty() {
			op := f.queue.PopItem().(*inject)
			hash := op.block.Hash()
			if f.queueChangeHook != nil {
				f.queueChangeHook(hash, false)
			}

			number := op.block.NumberU64()
			if number > height+1 {
				f.queue.Push(op, -int64(number))
				if f.queueChangeHook != nil {
					f.queueChangeHook(hash, true)
				}
				break
			}

			if number+maxUncleDist < height || f.getBlock(hash) != nil {
				f.forgetBlock(hash)
				continue
			}
			f.insert(op.origin, op.block)
		}

		select {
		    ....
		}
	}
}

既然是loop就还是一样的套路,里面有一个死循环,再嵌套一个select结构用来触发相应事件。在loop首先定义了两个定时器,不过此时定时时间都是0,也就是立即会触发。

然后进入for循环,先遍历所有正在fetcher的announce,对于那些fetcher超过5秒的进行抛弃。然后获取了区块链高度,接着遍历了queue队列,这是一个优先级队列,优先级是其区块编号的负数,这样编号越小的区块优先级越高。每次从队列顶端取一个,这个是inject对象,如果他的编号大于当前区块高度加一,就先放回队列然后退出循环,否则如果区块过于旧,即比当前区块高度减7还小就抛弃,最后如果合适就插入。

遍历完queue后进入select结构。其中一些case我们后面会陆续介绍。在loop中会通过四个map记录announce状态:announced表示等待fetch,fetching表示正在fetch;fetched表示fetch完头部等待fetch区块体;completing表示完全结束。

FilterHeaders

在pm中如果收到code为BlockHeadersMsg的消息时,这个消息是在请求方发送GetBlockHeadersMsg消息后收到响应的消息,表示接收到了区块头,这时会调用fetcher的FilterHeaders方法去处理

func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
	log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

	filter := make(chan *headerFilterTask)

	select {
	case f.headerFilter <- filter:
	case <-f.quit:
		return nil
	}

	select {
	case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
	case <-f.quit:
		return nil
	}

	select {
	case task := <-filter:
		return task.headers
	case <-f.quit:
		return nil
	}
}

首先给headerFilter赋值,触发loop中对应逻辑

case filter := <-f.headerFilter:
	var task *headerFilterTask
	select {
	case task = <-filter:
	case <-f.quit:
		return
	}
	headerFilterInMeter.Mark(int64(len(task.headers)))

	unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
	for _, header := range task.headers {
		hash := header.Hash()

		if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
			if header.Number.Uint64() != announce.number {
				log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
				f.dropPeer(announce.origin)
				f.forgetHash(hash)
				continue
			}

			if f.getBlock(hash) == nil {
				announce.header = header
				announce.time = task.time

				if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
					log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())

					block := types.NewBlockWithHeader(header)
					block.ReceivedAt = task.time

					complete = append(complete, block)
					f.completing[hash] = announce
					continue
				}
				incomplete = append(incomplete, announce)
			} else {
				log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
				f.forgetHash(hash)
			}
		} else {
			unknown = append(unknown, header)
		}
	}
	headerFilterOutMeter.Mark(int64(len(unknown)))
	select {
	case filter <- &headerFilterTask{headers: unknown, time: task.time}:
	case <-f.quit:
		return
	}
	for _, announce := range incomplete {
		hash := announce.header.Hash()
		if _, ok := f.completing[hash]; ok {
			continue
		}
		f.fetched[hash] = append(f.fetched[hash], announce)
		if len(f.fetched) == 1 {
			f.rescheduleComplete(completeTimer)
		}
	}
	for _, block := range complete {
		if announce := f.completing[block.Hash()]; announce != nil {
			f.enqueue(announce.origin, block)
		}
	}

在对应case中,首先就是一个select结构,这里代码优点特殊,首先headerFilter这个channel里面存储的也是一个channel,回到FilterHeaders的第二个select中,给filter赋值headerFilterTask对象,其中包装了peerID,区块头和当前时间,再回到loop的case中,这里select阻塞得到释放,task得到赋值,是一个headerFilterTask对象。

接着定义了三个数组:unknown, incomplete, complete,然后遍历刚才传过来的一些区块头。先进行了许多判断,主要是确定这个头正在在fetch,接着如果返回的高度和我们请求的区块高度不一样,则删除这个peer并删除这个hash,然后判断下一个head。接着如果一样的话,尝试从本地取区块,如果取不到,在接下来的一个判断中判断这个块是否为空,也就是没有交易且没有叔块,这时创建一个空的区块放入complete中,并将对应的announce放入completing表示fetch完成,若能从本地取到对应区块,则表示已经fetch过了,则抛弃这个hash。再往下,如果不满足最开始的判断,则将对于头放入unknown中。

在遍历完所有head后,再构造一个headerFilterTask将unknown放入其中,然后将这个headerFilterTask赋值给filter,这时触发FilterHeaders的第三个select,在那里面将task的heads返回,也就是刚才放入unknown中的,随后交给Downloader处理。

在loop的对应case中还有操作,首先遍历所有incomplete,这个里面的表示那些在fetch的,而且没有fetch完的,而且对应区块也不是空的announce,首先看其是否在completing中,也就是fetch完的,这时进行忽略,否则将announce放入fetched中对于hash的数组中。接着如果fetched只有一个等待fetch区块体的任务,则调用rescheduleComplete重置completeTimer定时器。

接着遍历complete数组,如果对应区块的announce在completing中也就是以及fetch完成的,则调用enqueue方法插入区块,这个方法稍后介绍。

FilterBodies

和上一个类似,这个方法是在请求一个区块体收到响应后进行了调用:

func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
	log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))

	filter := make(chan *bodyFilterTask)

	select {
	case f.bodyFilter <- filter:
	case <-f.quit:
		return nil, nil
	}

	select {
	case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
	case <-f.quit:
		return nil, nil
	}

	select {
	case task := <-filter:
		return task.transactions, task.uncles
	case <-f.quit:
		return nil, nil
	}
}

		case filter := <-f.bodyFilter:
			var task *bodyFilterTask
			select {
			case task = <-filter:
			case <-f.quit:
				return
			}
			bodyFilterInMeter.Mark(int64(len(task.transactions)))

			blocks := []*types.Block{}
			for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
				matched := false

				for hash, announce := range f.completing {
					if f.queued[hash] == nil {
						txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
						uncleHash := types.CalcUncleHash(task.uncles[i])

						if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
							matched = true

							if f.getBlock(hash) == nil {
								block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
								block.ReceivedAt = task.time

								blocks = append(blocks, block)
							} else {
								f.forgetHash(hash)
							}
						}
					}
				}
				if matched {
					task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
					task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
					i--
					continue
				}
			}

			bodyFilterOutMeter.Mark(int64(len(task.transactions)))
			select {
			case filter <- task:
			case <-f.quit:
				return
			}
			for _, block := range blocks {
				if announce := f.completing[block.Hash()]; announce != nil {
					f.enqueue(announce.origin, block)
				}
			}
		}

逻辑和上面的类似,这次是过滤body,主要是滤掉那些已经fetch完成的,剩余的返回交给Downloader处理,这里不再详述。

Notify

Notify方法是在pm收到NewBlockHashesMsg消息时得到调用,NewBlockHashesMsg是自己向其他peer发送新区快消息时用的code。接收方收到这个code表示有新的区块,在检测自己是否有相应区块后,将每个未知区块fetcher处理

func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
	headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
	block := &announce{
		hash:        hash,
		number:      number,
		time:        time,
		origin:      peer,
		fetchHeader: headerFetcher,
		fetchBodies: bodyFetcher,
	}
	select {
	case f.notify <- block:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

这里主要是构建了一个announce对象,表示有新区块的通知,然后传递给notify触发loop的逻辑:

case notification := <-f.notify:
	propAnnounceInMeter.Mark(1)

	count := f.announces[notification.origin] + 1
	if count > hashLimit {
		log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
		propAnnounceDOSMeter.Mark(1)
		break
	}

	if notification.number > 0 {
		if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
			log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
			propAnnounceDropMeter.Mark(1)
			break
		}
	}

	if _, ok := f.fetching[notification.hash]; ok {
		break
	}
	if _, ok := f.completing[notification.hash]; ok {
		break
	}
	f.announces[notification.origin] = count
	f.announced[notification.hash] = append(f.announced[notification.hash], notification)
	if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
		f.announceChangeHook(notification.hash, true)
	}
	if len(f.announced) == 1 {
		f.rescheduleFetch(fetchTimer)
	}

首先将来源的announce数量加1,然后判断是否超出最大数量,在检查区块高度是否太低或太高,也就是区块太旧或太新。然后检查该announce是否正在fetch或者已经完成。若都不是,则将其添加到announced中等待fetch,另外若添加完announced长度为1则立即重置fetchTimer进行fetch。

Enqueue

Enqueue方法是在pm收到NewBlockMsg消息后执行,NewBlockMsg也是自己向其他佩尔发送新的区块信息时的code,Enqueue实现如下:

func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
	op := &inject{
		origin: peer,
		block:  block,
	}
	select {
	case f.inject <- op:
		return nil
	case <-f.quit:
		return errTerminated
	}
}

构建了一个inject对象,然后赋值给inject触发loop的逻辑:

case op := <-f.inject:
	propBroadcastInMeter.Mark(1)
	f.enqueue(op.origin, op.block)

enqueue逻辑如下:

func (f *Fetcher) enqueue(peer string, block *types.Block) {
	hash := block.Hash()

	count := f.queues[peer] + 1
	if count > blockLimit {
		log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
		propBroadcastDOSMeter.Mark(1)
		f.forgetHash(hash)
		return
	}

	if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
		log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
		propBroadcastDropMeter.Mark(1)
		f.forgetHash(hash)
		return
	}

	if _, ok := f.queued[hash]; !ok {
		op := &inject{
			origin: peer,
			block:  block,
		}
		f.queues[peer] = count
		f.queued[hash] = op
		f.queue.Push(op, -int64(block.NumberU64()))
		if f.queueChangeHook != nil {
			f.queueChangeHook(op.block.Hash(), true)
		}
		log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
	}
}

和Notify逻辑类似,也是给来源加一,再判断是否超出最大值,然后判断区块是否太旧或太新。对于符合条件的,判断是否添加到queued中,若没有,则重新创建一个inject对象添加到queued和queue中。

fetchTimer

前面的种种方法基本上都是对到来的announce进行分类,而在开头的两个定时器则真正担负了驱动作用,先看fetchTimer,由于开始设置的时间为0,所以立即触发:

case <-fetchTimer.C:
	request := make(map[string][]common.Hash)

	for hash, announces := range f.announced {
		if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
			announce := announces[rand.Intn(len(announces))]
			f.forgetHash(hash)

			if f.getBlock(hash) == nil {
				request[announce.origin] = append(request[announce.origin], hash)
				f.fetching[hash] = announce
			}
		}
	}
	for peer, hashes := range request {
		log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

		fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
		go func() {
			if f.fetchingHook != nil {
				f.fetchingHook(hashes)
			}
			for _, hash := range hashes {
				headerFetchMeter.Mark(1)
				fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
			}
		}()
	}
	f.rescheduleFetch(fetchTimer)

首先遍历announced,announced存储了对于同一个区块来自不同peer的announce,它的第一个announce则是最早的哪一个,如果400毫秒,则随机取一个announce,然后删除该hash,如果这个hash还未fetch到,则将其添加到fetching中,并在request中记录,request记录了同一个来源的多个hash。随后遍历所有request,获得每个来源的fetchHeader方法,这个方法在调用Notify方法构建一个announce时被传入,是peer的一个方法,只要是发送一个GetBlockHeadersMsg消息去获取区块头,之后在每次循环后启动一个goroutine去变量该来源的所有hash然后请求。最后重置fetchTimer。

发送GetBlockHeadersMsg后,如果顺利会收到BlockHeadersMsg消息,这时也就触发FilterHeaders,这是回到前面分析的逻辑,将所有head分三类,unknown的交给download处理,incomplete的存放到fetched中等待fetch区块体,complete的且fetch完成的交给enqueue处理。

completeTimer

case <-completeTimer.C:
	request := make(map[string][]common.Hash)

	for hash, announces := range f.fetched {
		announce := announces[rand.Intn(len(announces))]
		f.forgetHash(hash)

		if f.getBlock(hash) == nil {
			request[announce.origin] = append(request[announce.origin], hash)
			f.completing[hash] = announce
		}
	}

	for peer, hashes := range request {
		log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)

		if f.completingHook != nil {
			f.completingHook(hashes)
		}
		bodyFetchMeter.Mark(int64(len(hashes)))
		go f.completing[hashes[0]].fetchBodies(hashes)
	}
	f.rescheduleComplete(completeTimer)

completeTimer的逻辑和fetchTimer类似,只不过前一个是fetch区块头,这个是fetch区块体。首先是从fetched中取一组announce,他们都对应一个区块。然后从中随机取一个announce,若果本地没有该区块则添加到reuqest,这个同意也是存储一个来源的多个announce,然后变量request,调用announce的fetchBodies方法去请求一个来源的多个区块,fetchBodies也是在创建announce时传入的,他会发送GetBlockBodiesMsg消息,同样如果顺利的话会收到BlockBodiesMsg消息,触发FilterBodies方法,又回到前面的逻辑。

insert

之前许多方法中我们都调用了enqueue方法,这里将一个个inject对象放入queue队列里,然后在loop的每个循环开始,处理queue队列里的东西,对于符合规定的,调用insert方法:

func (f *Fetcher) insert(peer string, block *types.Block) {
	hash := block.Hash()

	log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
	go func() {
		defer func() { f.done <- hash }()

		parent := f.getBlock(block.ParentHash())
		if parent == nil {
			log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
			return
		}

		switch err := f.verifyHeader(block.Header()); err {
		case nil:
			propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
			go f.broadcastBlock(block, true)

		case consensus.ErrFutureBlock:

		default:
			log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
			f.dropPeer(peer)
			return
		}
		if _, err := f.insertChain(types.Blocks{block}); err != nil {
			log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
			return
		}
		propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
		go f.broadcastBlock(block, false)

		if f.importedHook != nil {
			f.importedHook(block)
		}
	}()
}

直接启动了一个goroutine,先获取要插入区块的父块,如果不存在停止插入,然后验证区块头,出错的话而且不是consensus.ErrFutureBlock错误的话,抛弃来源的peer,没有错的话启动一个goroutine调用broadcastBlock方法。之后调用insertchain方法插入一个区块,如果插入成功再次调用broadcastBlock方法,这里第二个参数为true,关于broadcastBlock在ProtocolManager分析中有介绍。另外insertChain方法是blockchain的方法,后面会涉及到。

rescheduleFetch

前面也多次涉及到了这个方法,在定时器触发时最后会用到,在notify添加完一个announce后如果announced长度为一,表示之前announced都已处理过,这是要重新启动定时器。

func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
	if len(f.announced) == 0 {
		return
	}
	// Otherwise find the earliest expiring announcement
	earliest := time.Now()
	for _, announces := range f.announced {
		if earliest.After(announces[0].time) {
			earliest = announces[0].time
		}
	}
	fetch.Reset(arriveTimeout - time.Since(earliest))
}

首先announced长度为0,表示没有要fetch的任务,则返回,前面说过,在notify中向announced新添加时会触发此方法重设定时器。之后遍历所有announced,每个hash对应的那组取第一个也就时间最早的那个announce尝试更新earliest,最后earliest为所有announce最早的那一个,然后重设fetchTimer定时器。

类似的还有一个rescheduleComplete,大致逻辑一样,不在赘述

forgetHash

这个也多次出现,出钥匙删除一个hash对应的announce

func (f *Fetcher) forgetHash(hash common.Hash) {
	for _, announce := range f.announced[hash] {
		f.announces[announce.origin]--
		if f.announces[announce.origin] == 0 {
			delete(f.announces, announce.origin)
		}
	}
	delete(f.announced, hash)
	if f.announceChangeHook != nil {
		f.announceChangeHook(hash, false)
	}

	if announce := f.fetching[hash]; announce != nil {
		f.announces[announce.origin]--
		if f.announces[announce.origin] == 0 {
			delete(f.announces, announce.origin)
		}
		delete(f.fetching, hash)
	}


	for _, announce := range f.fetched[hash] {
		f.announces[announce.origin]--
		if f.announces[announce.origin] == 0 {
			delete(f.announces, announce.origin)
		}
	}
	delete(f.fetched, hash)


	if announce := f.completing[hash]; announce != nil {
		f.announces[announce.origin]--
		if f.announces[announce.origin] == 0 {
			delete(f.announces, announce.origin)
		}
		delete(f.completing, hash)
	}
}

首先遍历announced对应hash的所有announce,然后将对应来源的计数器减1,如果减到0则删除该来源的计数器。然后从announced删除hash对应的值。结合如果fetching、fetched及completing有对应的键值对,也相应的删除。

stop

func (f *Fetcher) Stop() {
	close(f.quit)
}

这个就是关闭方法,关闭quit这个通道,触发多个地方的逻辑,第一个就是loop中,这里面退出了无限循环,其次还有其他涉及select阻塞的地方,如notify、FilterHeaders等,都是直接退出。

题图来自unsplash:https://unsplash.com/photos/m6tAqZvy4RM