go-ethereum中ethdb源码学习

前面学习了ethereum的编码和数据结构,这里学习一些ethereum的持久化存储也就是数据相关的源码。

背景

go-ethereum的数据存储时借助于leveldb数据库,它是Google开发的一种键值对类型的数据库,另外Facebook又基于其开发了RocksDB数据库。简单来说,leveldb具有轻量以及高性能的特点。

原生的leveldb是用c++写的,并不方便直接用到go项目中,好在leveldb的开发者用go重新实现了leveldb,我们可以直接使用(github地址)。

go版本的leveldb需要go1.5以上版本,安装很简单,执行下面命令

go get github.com/syndtr/goleveldb/leveldb

go-leveldb使用

由于是键值对类型的数据库,所以使用比较简单,下面简单介绍一下基本操作,更高级的操作参见API文档。下面操作示例代码见这里

打开数据库

db, err := leveldb.OpenFile("db", nil)
if(err!=nil){
	log.Fatalln(err.Error())
}
defer db.Close()

OpenFile会创建或打开一个数据库。

增删改查

err = db.Put([]byte("key"), []byte("value"), nil)
err = db.Delete([]byte("key"), nil)
data, err := db.Get([]byte("key"), nil)

基本上就对于put,get,delete几个方法,其中更改一个记录的话也是用put。

普通迭代

iter := db.NewIterator(nil, nil)
for iter.Next() {
    key := iter.Key()
    value := iter.Value()
}
iter.Release()

就是迭代数据库全部键值对

指定起点的迭代

首先需要明白的是,leveldb的存储是按key的顺序存储的,所以可以指定一个key,从该key开始遍历

iter:=db.NewIterator(nil,nil)
for ok:=iter.Seek(key);ok;ok=iter.Next(){
	fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)

子集迭代

可以指定区间进行迭代

iter:=db.NewIterator(&util.Range{Start:[]byte("key2"),Limit:[]byte("key6")},nil)
for iter.Next(){
	fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)

前缀迭代

只迭代有指定前缀的

iter:=db.NewIterator(util.BytesPrefix([]byte(prefix)),nil)
for iter.Next(){
	fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)

批量写入

batch:=new(leveldb.Batch)
for i:=0; i<10; i++ {
	batch.Put([]byte("batch"+strconv.Itoa(i)),[]byte("batch"+strconv.Itoa(i)))
}
err:=db.Write(batch,nil)
if err!=nil {
	fmt.Println(err)
}

源码分析

源码集中在ethdb目录内,主要是对leveldb的封装。

interface.go

顾名思义就是定义了一些接口,如数据库接口:

type Database interface {
	Putter
	Deleter
	Get(key []byte) ([]byte, error)
	Has(key []byte) (bool, error)
	Close()
	NewBatch() Batch
}

Batch接口。用于批量操作,不能用于并发

type Batch interface {
	Putter
	Deleter
	ValueSize() int 
	Write() error
	Reset()
}

上面两个结构又包含了Putter和Deleter接口

type Putter interface {
	Put(key []byte, value []byte) error
}
type Deleter interface {
	Delete(key []byte) error
}

database.go

构造

这就是ethereum所使用的代码,先看结构体

type LDBDatabase struct {
	fn string      // filename for reporting
	db *leveldb.DB // LevelDB instance

	compTimeMeter    metrics.Meter // Meter for measuring the total time spent in database compaction
	compReadMeter    metrics.Meter // Meter for measuring the data read during compaction
	compWriteMeter   metrics.Meter // Meter for measuring the data written during compaction
	writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
	writeDelayMeter  metrics.Meter // Meter for measuring the write delay duration due to database compaction
	diskReadMeter    metrics.Meter // Meter for measuring the effective amount of data read
	diskWriteMeter   metrics.Meter // Meter for measuring the effective amount of data written

	quitLock sync.Mutex      // Mutex protecting the quit channel access
	quitChan chan chan error // Quit channel to stop the metrics collection before closing the database

	log log.Logger // Contextual logger tracking the database path
}

NewLDBDatabase

成员很多,关键的就是一个leveldb.DB对象和几个同步操作的成员。接下来看新建数据库的方法

func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
	logger := log.New("database", file)
	
	//从init调用过来时,cache和handles都为0
	//默认启动节点,cache为512,handles为0,在node.DefaultConfig配置
	if cache < 16 {
		cache = 16
	}
	if handles < 16 {
		handles = 16
	}
	logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)

	db, err := leveldb.OpenFile(file, &opt.Options{
		OpenFilesCacheCapacity: handles,
		BlockCacheCapacity:     cache / 2 * opt.MiB,
		WriteBuffer:            cache / 4 * opt.MiB, 
		Filter:                 filter.NewBloomFilter(10),
	})
	if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
		db, err = leveldb.RecoverFile(file, nil)
	}
	if err != nil {
		return nil, err
	}
	return &LDBDatabase{
		fn:  file,
		db:  db,
		log: logger,
	}, nil
}

逻辑很简单,创建了一个levelDB对象,主要看一下options的内容:

  1. OpenFilesCacheCapacity:定义打开的文件缓存大小,默认是500,设为-1或0时表示不缓存
  2. BlockCacheCapacity:定义了一个名为sorted table的缓存容量,默认是8MB,设为-1或0时表示不缓存
  3. WriteBuffer:定义memdb的大小,它是一个内存数据库,默认是4MB.
  4. Filter:定义过滤器,优化读性能。这是使用了一个布隆过滤器

put,has,get,delete

func (db *LDBDatabase) Put(key []byte, value []byte) error {
	return db.db.Put(key, value, nil)
}
func (db *LDBDatabase) Has(key []byte) (bool, error) {
	return db.db.Has(key, nil)
}
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
	dat, err := db.db.Get(key, nil)
	if err != nil {
		return nil, err
	}
	return dat, nil
}
func (db *LDBDatabase) Delete(key []byte) error {
	return db.db.Delete(key, nil)
}

都是对leveldb做了封装而已。

Batch及其操作

批量读写的封装

type ldbBatch struct {
	db   *leveldb.DB
	b    *leveldb.Batch
	size int
}

put、delete、write、reset

func (b *ldbBatch) Put(key, value []byte) error {
	b.b.Put(key, value)
	b.size += len(value)
	return nil
}

func (b *ldbBatch) Delete(key []byte) error {
	b.b.Delete(key)
	b.size += 1
	return nil
}

func (b *ldbBatch) Write() error {
	return b.db.Write(b.b, nil)
}

func (b *ldbBatch) Reset() {
	b.b.Reset()
	b.size = 0
}

Meter

这是用于初始化LDBDatabase的一系列Meter成员用的

func (db *LDBDatabase) Meter(prefix string) {
	db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
	db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
	db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
	db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
	db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
	db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
	db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)

	db.quitLock.Lock()
	db.quitChan = make(chan chan error)
	db.quitLock.Unlock()

	go db.meter(3 * time.Second)
}

这个方法内先初始化各种Meter,然后创建了一个chan,最后启动一个goroutine运行meter,之后每3秒收集一次信息并反馈到Meter。这一段代码比较长,就不贴出来了。主要是利用db.db.GetProperty(“leveldb.stats”)获取信息,信息格式如下:

//    Level |   Tables   |    Size(MB)   |    Time(sec)  |    Read(MB)   |   Write(MB)
//   -------+------------+---------------+---------------+---------------+---------------
//      0   |          0 |       0.00000 |       1.27969 |       0.00000 |      12.31098
//      1   |         85 |     109.27913 |      28.09293 |     213.92493 |     214.26294
//      2   |        523 |    1000.37159 |       7.26059 |      66.86342 |      66.77884
//      3   |        570 |    1113.18458 |       0.00000 |       0.00000 |       0.00000

下面就是解析这个字符串并写入Meter。另外一点这段代周期循环的关键代码如下

for i := 1; errc == nil && merr == nil; i++ {
    //....
    select {
		case errc = <-db.quitChan:
		case <-time.After(refresh):
		}
}

到select出会阻塞,3秒后进行下一次循环,退出时向Meter方法中初始化的chan发送信息,导致errc不为nil,就自然退出循环

close

func (db *LDBDatabase) Close() {
	db.quitLock.Lock()
	defer db.quitLock.Unlock()

	if db.quitChan != nil {
		errc := make(chan error)
		db.quitChan <- errc
		if err := <-errc; err != nil {
			db.log.Error("Metrics collection failed", "err", err)
		}
		db.quitChan = nil
	}
	err := db.db.Close()
	if err == nil {
		db.log.Info("Database closed")
	} else {
		db.log.Error("Failed to close database", "err", err)
	}
}

退出代码也很简单,主要就是在加锁环境下,向quitChan写入信息,停止meter,然后等待反馈(在meter发出反馈),最后关闭数据库。

memory_database.go

这是一个用于测试的基于内存的数据库。在源码中主要是在geth初始化时,如果最后创建数据库时依旧没有有效的datadir则使用这个数据库代替

构造

type MemDatabase struct {
	db   map[string][]byte
	lock sync.RWMutex
}

func NewMemDatabase() *MemDatabase {
	return &MemDatabase{
		db: make(map[string][]byte),
	}
}

可见就是基于map的封装。

基础操作

func (db *MemDatabase) Put(key []byte, value []byte) error {
	db.lock.Lock()
	defer db.lock.Unlock()

	db.db[string(key)] = common.CopyBytes(value)
	return nil
}

func (db *MemDatabase) Has(key []byte) (bool, error) {
	db.lock.RLock()
	defer db.lock.RUnlock()

	_, ok := db.db[string(key)]
	return ok, nil
}

func (db *MemDatabase) Get(key []byte) ([]byte, error) {
	db.lock.RLock()
	defer db.lock.RUnlock()

	if entry, ok := db.db[string(key)]; ok {
		return common.CopyBytes(entry), nil
	}
	return nil, errors.New("not found")
}

func (db *MemDatabase) Delete(key []byte) error {
	db.lock.Lock()
	defer db.lock.Unlock()

	delete(db.db, string(key))
	return nil
}

全是基于map操作,只不过进行了加锁

Batch

type memBatch struct {
	db     *MemDatabase
	writes []kv
	size   int
}
type kv struct {
	k, v []byte
	del  bool
}
func (db *MemDatabase) NewBatch() Batch {
	return &memBatch{db: db}
}
func (b *memBatch) Put(key, value []byte) error {
	b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value), false})
	b.size += len(value)
	return nil
}

func (b *memBatch) Delete(key []byte) error {
	b.writes = append(b.writes, kv{common.CopyBytes(key), nil, true})
	b.size += 1
	return nil
}

func (b *memBatch) Write() error {
	b.db.lock.Lock()
	defer b.db.lock.Unlock()

	for _, kv := range b.writes {
		if kv.del {
			delete(b.db.db, string(kv.k))
			continue
		}
		b.db.db[string(kv.k)] = kv.v
	}
	return nil
}
func (b *memBatch) Reset() {
	b.writes = b.writes[:0]
	b.size = 0
}

批量操作先存储在一个KV类型的数组内,等到写入时遍历那个数组,依次存入数据库

table.go与table_batch.go

这两个也是对数据的封装,之所以叫table是因为实例化一个table时要指定一个前缀,之后利用table的基本操作都会给key添加指定的前缀。table_batch也类似,直接看一下table.go的源码

func NewTable(db Database, prefix string) Database {
	return &table{
		db:     db,
		prefix: prefix,
	}
}

func (dt *table) Put(key []byte, value []byte) error {
	return dt.db.Put(append([]byte(dt.prefix), key...), value)
}

func (dt *table) Has(key []byte) (bool, error) {
	return dt.db.Has(append([]byte(dt.prefix), key...))
}

func (dt *table) Get(key []byte) ([]byte, error) {
	return dt.db.Get(append([]byte(dt.prefix), key...))
}

func (dt *table) Delete(key []byte) error {
	return dt.db.Delete(append([]byte(dt.prefix), key...))
}