go-ethereum中event源码分析
event.go
数据结构
type TypeMuxEvent struct {
Time time.Time
Data interface{}
}
TypeMuxEvent就是一个具体的事件,同时带有时间戳
type TypeMux struct {
mutex sync.RWMutex
subm map[reflect.Type][]*TypeMuxSubscription
stopped bool
}
这个目前已经弃用,被Feed替代,我们稍后介绍Feed。这个里面有一个subm保存着每种类型的所有订阅者
type TypeMuxSubscription struct {
mux *TypeMux
created time.Time
closeMu sync.Mutex
closing chan struct{}
closed bool
postMu sync.RWMutex
readC <-chan *TypeMuxEvent
postC chan<- *TypeMuxEvent
}
表示一个订阅者
Subscribe
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
sub := newsub(mux)
mux.mutex.Lock()
defer mux.mutex.Unlock()
if mux.stopped {
sub.closed = true
close(sub.postC)
} else {
if mux.subm == nil {
mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
}
for _, t := range types {
rtyp := reflect.TypeOf(t)
oldsubs := mux.subm[rtyp]
if find(oldsubs, sub) != -1 {
panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
}
subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
copy(subs, oldsubs)
subs[len(oldsubs)] = sub
mux.subm[rtyp] = subs
}
}
return sub
}
func newsub(mux *TypeMux) *TypeMuxSubscription {
c := make(chan *TypeMuxEvent)
return &TypeMuxSubscription{
mux: mux,
created: time.Now(),
readC: c,
postC: c,
closing: make(chan struct{}),
}
}
这个方法就是创建一个订阅。首先用newsub方法创建一个TypeMuxSubscription对象,如果TypeMux已停止,则标记TypeMuxSubscription以关闭。否则,遍历传入的所有类型,将每个类型对应的TypeMuxSubscription数组中添加刚才创建的TypeMuxSubscription对象。不能重复订阅,最后返回TypeMuxSubscription对象用于读取事件等。
Post
func (mux *TypeMux) Post(ev interface{}) error {
event := &TypeMuxEvent{
Time: time.Now(),
Data: ev,
}
rtyp := reflect.TypeOf(ev)
mux.mutex.RLock()
if mux.stopped {
mux.mutex.RUnlock()
return ErrMuxClosed
}
subs := mux.subm[rtyp]
mux.mutex.RUnlock()
for _, sub := range subs {
sub.deliver(event)
}
return nil
}
表示发布一个事件,首先构造一个TypeMuxEvent对象包含我们传入的事件主体,然后从TypeMux的subm中查找对应类型的所有订阅者,然后其调用deliver
func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
if s.created.After(event.Time) {
return
}
s.postMu.RLock()
defer s.postMu.RUnlock()
select {
case s.postC <- event:
case <-s.closing:
}
}
首先确保时间逻辑上正确,即订阅时间早于事件发布时间。之后向postC传入事件即可。
Unsubscribe
func (s *TypeMuxSubscription) Unsubscribe() {
s.mux.del(s)
s.closewait()
}
func (mux *TypeMux) del(s *TypeMuxSubscription) {
mux.mutex.Lock()
for typ, subs := range mux.subm {
if pos := find(subs, s); pos >= 0 {
if len(subs) == 1 {
delete(mux.subm, typ)
} else {
mux.subm[typ] = posdelete(subs, pos)
}
}
}
s.mux.mutex.Unlock()
}
func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
news := make([]*TypeMuxSubscription, len(slice)-1)
copy(news[:pos], slice[:pos])
copy(news[pos:], slice[pos+1:])
return news
}
取消订阅,主要是将对应的TypeMuxSubscription从subm中删除。
feed.go
这个是目前主要使用的。
type Feed struct {
once sync.Once
sendLock chan struct{}
removeSub chan interface{}
sendCases caseList
mu sync.Mutex
inbox caseList
etype reflect.Type
closed bool
}
我们先看一下源码中怎么利用feed进行订阅及发布消息的,以NewTxsEvent为例
//订阅事件
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
//发布事件
pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}})
//取消订阅
pm.txsSub.Unsubscribe()
Subscribe
func (f *Feed) Subscribe(channel interface{}) Subscription {
f.once.Do(f.init)
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
首先执行一次初始化操作,由于用了once,所以无论订阅多少次都只执行一次初始化操作。
func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock <- struct{}{}
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}
init主要就是一些变量的初始化。接着判断参数是否合法,即参数通道中的数据类型不能是通道,其次这个通道不能是一个只能接受数据的单向通道。
之后新建一个feedSub对象表示一个订阅,feedSub中保存着我们给定的通道。接着typecheck方法是类型检查,一个Feed在第一次调用Subscribe时会锁定一种类似,后续的订阅都必须是这种类型。检查通过后,创建一个SelectCase对象放入inbox中,SelectCase是实际负责发送订阅消息的。方法最后返回feedSub对象,用于取消订阅。
源码中还使用了scope.Track方法,这是对订阅进行包装方便取消订阅的,实际取消逻辑还是在feedSub及feed中
Unsubscribe
func (sub *feedSub) Unsubscribe() {
sub.errOnce.Do(func() {
sub.feed.remove(sub)
close(sub.err)
})
}
func (f *Feed) remove(sub *feedSub) {
ch := sub.channel.Interface()
f.mu.Lock()
index := f.inbox.find(ch)
if index != -1 {
f.inbox = f.inbox.delete(index)
f.mu.Unlock()
return
}
f.mu.Unlock()
select {
case f.removeSub <- ch:
case <-f.sendLock:
f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
f.sendLock <- struct{}{}
}
}
就是将某个SelectCase从inbox中删除。
Send
func (f *Feed) Send(value interface{}) (nsent int) {
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
<-f.sendLock
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
cases := f.sendCases
for {
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
chosen, recv, _ := reflect.Select(cases)
if chosen == 0 {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent
}
首先也执行了一次init操作。之后将所有inbox中的SelectCase放入sendCases中,接着检查了要发送是事件类型是否是feed所绑定的。接着从sendCases的第一个SelectCase开始给其send参数赋值。
接着有一个无限循环,从着sendCases的第一个SelectCase开始,对于每个SelectCase调用其Chan成员的TrySend方法,每个SelectCase的Chan就是注册时传入的通道,TrySend方法表示立即发送数据不阻塞,如果有阻塞则返回false。这里如果发送成功则将成功的SelectCase删除。如果不能立即发送者不删除去尝试发送下一个。一遍循环后,如果都发送成功则退出无限循环,否则每次调用Select选择一个可以发送的进行发送。
这也就是feed比较优秀的部分,他会跳过那些正在阻塞的通道,优先让一部分可发送的通道收到消息。
回顾feed一共设计了两个caseList来存储SelectCase,inbox保存着每次订阅时创建的储SelectCase,sendCases则是在发送时从inbox中取值,它里面的数据会随着情况变化。
题图来自unsplash:https://unsplash.com/photos/ii5JY_46xH0