go-ethereum中event源码分析

event.go

数据结构

type TypeMuxEvent struct {
	Time time.Time
	Data interface{}
}

TypeMuxEvent就是一个具体的事件,同时带有时间戳

type TypeMux struct {
	mutex   sync.RWMutex
	subm    map[reflect.Type][]*TypeMuxSubscription
	stopped bool
}

这个目前已经弃用,被Feed替代,我们稍后介绍Feed。这个里面有一个subm保存着每种类型的所有订阅者

type TypeMuxSubscription struct {
	mux     *TypeMux
	created time.Time
	closeMu sync.Mutex
	closing chan struct{}
	closed  bool

	postMu sync.RWMutex
	readC  <-chan *TypeMuxEvent
	postC  chan<- *TypeMuxEvent
}

表示一个订阅者

Subscribe

func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
	sub := newsub(mux)
	mux.mutex.Lock()
	defer mux.mutex.Unlock()
	if mux.stopped {
		sub.closed = true
		close(sub.postC)
	} else {
		if mux.subm == nil {
			mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
		}
		for _, t := range types {
			rtyp := reflect.TypeOf(t)
			oldsubs := mux.subm[rtyp]
			if find(oldsubs, sub) != -1 {
				panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
			}
			subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
			copy(subs, oldsubs)
			subs[len(oldsubs)] = sub
			mux.subm[rtyp] = subs
		}
	}
	return sub
}

func newsub(mux *TypeMux) *TypeMuxSubscription {
	c := make(chan *TypeMuxEvent)
	return &TypeMuxSubscription{
		mux:     mux,
		created: time.Now(),
		readC:   c,
		postC:   c,
		closing: make(chan struct{}),
	}
}

这个方法就是创建一个订阅。首先用newsub方法创建一个TypeMuxSubscription对象,如果TypeMux已停止,则标记TypeMuxSubscription以关闭。否则,遍历传入的所有类型,将每个类型对应的TypeMuxSubscription数组中添加刚才创建的TypeMuxSubscription对象。不能重复订阅,最后返回TypeMuxSubscription对象用于读取事件等。

Post

func (mux *TypeMux) Post(ev interface{}) error {
	event := &TypeMuxEvent{
		Time: time.Now(),
		Data: ev,
	}
	rtyp := reflect.TypeOf(ev)
	mux.mutex.RLock()
	if mux.stopped {
		mux.mutex.RUnlock()
		return ErrMuxClosed
	}
	subs := mux.subm[rtyp]
	mux.mutex.RUnlock()
	for _, sub := range subs {
		sub.deliver(event)
	}
	return nil
}

表示发布一个事件,首先构造一个TypeMuxEvent对象包含我们传入的事件主体,然后从TypeMux的subm中查找对应类型的所有订阅者,然后其调用deliver

func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
	if s.created.After(event.Time) {
		return
	}

	s.postMu.RLock()
	defer s.postMu.RUnlock()

	select {
	case s.postC <- event:
	case <-s.closing:
	}
}

首先确保时间逻辑上正确,即订阅时间早于事件发布时间。之后向postC传入事件即可。

Unsubscribe

func (s *TypeMuxSubscription) Unsubscribe() {
	s.mux.del(s)
	s.closewait()
}

func (mux *TypeMux) del(s *TypeMuxSubscription) {
	mux.mutex.Lock()
	for typ, subs := range mux.subm {
		if pos := find(subs, s); pos >= 0 {
			if len(subs) == 1 {
				delete(mux.subm, typ)
			} else {
				mux.subm[typ] = posdelete(subs, pos)
			}
		}
	}
	s.mux.mutex.Unlock()
}

func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
	news := make([]*TypeMuxSubscription, len(slice)-1)
	copy(news[:pos], slice[:pos])
	copy(news[pos:], slice[pos+1:])
	return news
}

取消订阅,主要是将对应的TypeMuxSubscription从subm中删除。

feed.go

这个是目前主要使用的。

type Feed struct {
	once      sync.Once        
	sendLock  chan struct{}    
	removeSub chan interface{} 
	sendCases caseList         

	mu     sync.Mutex
	inbox  caseList
	etype  reflect.Type
	closed bool
}

我们先看一下源码中怎么利用feed进行订阅及发布消息的,以NewTxsEvent为例

//订阅事件
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
	return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)

//发布事件
pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})

//取消订阅
pm.txsSub.Unsubscribe() 

Subscribe

func (f *Feed) Subscribe(channel interface{}) Subscription {
	f.once.Do(f.init)

	chanval := reflect.ValueOf(channel)
	chantyp := chanval.Type()
	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
		panic(errBadChannel)
	}
	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

	f.mu.Lock()
	defer f.mu.Unlock()
	if !f.typecheck(chantyp.Elem()) {
		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
	}
	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
	f.inbox = append(f.inbox, cas)
	return sub
}

首先执行一次初始化操作,由于用了once,所以无论订阅多少次都只执行一次初始化操作。

func (f *Feed) init() {
	f.removeSub = make(chan interface{})
	f.sendLock = make(chan struct{}, 1)
	f.sendLock <- struct{}{}
	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

init主要就是一些变量的初始化。接着判断参数是否合法,即参数通道中的数据类型不能是通道,其次这个通道不能是一个只能接受数据的单向通道。

之后新建一个feedSub对象表示一个订阅,feedSub中保存着我们给定的通道。接着typecheck方法是类型检查,一个Feed在第一次调用Subscribe时会锁定一种类似,后续的订阅都必须是这种类型。检查通过后,创建一个SelectCase对象放入inbox中,SelectCase是实际负责发送订阅消息的。方法最后返回feedSub对象,用于取消订阅。

源码中还使用了scope.Track方法,这是对订阅进行包装方便取消订阅的,实际取消逻辑还是在feedSub及feed中

Unsubscribe


func (sub *feedSub) Unsubscribe() {
	sub.errOnce.Do(func() {
		sub.feed.remove(sub)
		close(sub.err)
	})
}

func (f *Feed) remove(sub *feedSub) {
	ch := sub.channel.Interface()
	f.mu.Lock()
	index := f.inbox.find(ch)
	if index != -1 {
		f.inbox = f.inbox.delete(index)
		f.mu.Unlock()
		return
	}
	f.mu.Unlock()

	select {
	case f.removeSub <- ch:

	case <-f.sendLock:

		f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
		f.sendLock <- struct{}{}
	}
}

就是将某个SelectCase从inbox中删除。

Send

func (f *Feed) Send(value interface{}) (nsent int) {
	rvalue := reflect.ValueOf(value)

	f.once.Do(f.init)
	<-f.sendLock

	f.mu.Lock()
	f.sendCases = append(f.sendCases, f.inbox...)
	f.inbox = nil

	if !f.typecheck(rvalue.Type()) {
		f.sendLock <- struct{}{}
		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
	}
	f.mu.Unlock()

	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = rvalue
	}

	cases := f.sendCases
	for {
		for i := firstSubSendCase; i < len(cases); i++ {
			if cases[i].Chan.TrySend(rvalue) {
				nsent++
				cases = cases.deactivate(i)
				i--
			}
		}
		if len(cases) == firstSubSendCase {
			break
		}

		chosen, recv, _ := reflect.Select(cases)
		if chosen == 0  {
			index := f.sendCases.find(recv.Interface())
			f.sendCases = f.sendCases.delete(index)
			if index >= 0 && index < len(cases) {
				cases = f.sendCases[:len(cases)-1]
			}
		} else {
			cases = cases.deactivate(chosen)
			nsent++
		}
	}

	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = reflect.Value{}
	}
	f.sendLock <- struct{}{}
	return nsent
}

首先也执行了一次init操作。之后将所有inbox中的SelectCase放入sendCases中,接着检查了要发送是事件类型是否是feed所绑定的。接着从sendCases的第一个SelectCase开始给其send参数赋值。

接着有一个无限循环,从着sendCases的第一个SelectCase开始,对于每个SelectCase调用其Chan成员的TrySend方法,每个SelectCase的Chan就是注册时传入的通道,TrySend方法表示立即发送数据不阻塞,如果有阻塞则返回false。这里如果发送成功则将成功的SelectCase删除。如果不能立即发送者不删除去尝试发送下一个。一遍循环后,如果都发送成功则退出无限循环,否则每次调用Select选择一个可以发送的进行发送。

这也就是feed比较优秀的部分,他会跳过那些正在阻塞的通道,优先让一部分可发送的通道收到消息。

回顾feed一共设计了两个caseList来存储SelectCase,inbox保存着每次订阅时创建的储SelectCase,sendCases则是在发送时从inbox中取值,它里面的数据会随着情况变化。

题图来自unsplash:https://unsplash.com/photos/ii5JY_46xH0