go-ethereum中ethdb源码学习
前面学习了ethereum的编码和数据结构,这里学习一些ethereum的持久化存储也就是数据相关的源码。
背景
go-ethereum的数据存储时借助于leveldb数据库,它是Google开发的一种键值对类型的数据库,另外Facebook又基于其开发了RocksDB数据库。简单来说,leveldb具有轻量以及高性能的特点。
原生的leveldb是用c++写的,并不方便直接用到go项目中,好在leveldb的开发者用go重新实现了leveldb,我们可以直接使用(github地址)。
go版本的leveldb需要go1.5以上版本,安装很简单,执行下面命令
go get github.com/syndtr/goleveldb/leveldb
go-leveldb使用
由于是键值对类型的数据库,所以使用比较简单,下面简单介绍一下基本操作,更高级的操作参见API文档。下面操作示例代码见这里
打开数据库
db, err := leveldb.OpenFile("db", nil)
if(err!=nil){
log.Fatalln(err.Error())
}
defer db.Close()
OpenFile会创建或打开一个数据库。
增删改查
err = db.Put([]byte("key"), []byte("value"), nil)
err = db.Delete([]byte("key"), nil)
data, err := db.Get([]byte("key"), nil)
基本上就对于put,get,delete几个方法,其中更改一个记录的话也是用put。
普通迭代
iter := db.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
value := iter.Value()
}
iter.Release()
就是迭代数据库全部键值对
指定起点的迭代
首先需要明白的是,leveldb的存储是按key的顺序存储的,所以可以指定一个key,从该key开始遍历
iter:=db.NewIterator(nil,nil)
for ok:=iter.Seek(key);ok;ok=iter.Next(){
fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)
子集迭代
可以指定区间进行迭代
iter:=db.NewIterator(&util.Range{Start:[]byte("key2"),Limit:[]byte("key6")},nil)
for iter.Next(){
fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)
前缀迭代
只迭代有指定前缀的
iter:=db.NewIterator(util.BytesPrefix([]byte(prefix)),nil)
for iter.Next(){
fmt.Println(string(iter.Key()),"---",string(iter.Value()))
}
iter.Release()
err := iter.Error()
fmt.Println(err)
批量写入
batch:=new(leveldb.Batch)
for i:=0; i<10; i++ {
batch.Put([]byte("batch"+strconv.Itoa(i)),[]byte("batch"+strconv.Itoa(i)))
}
err:=db.Write(batch,nil)
if err!=nil {
fmt.Println(err)
}
源码分析
源码集中在ethdb目录内,主要是对leveldb的封装。
interface.go
顾名思义就是定义了一些接口,如数据库接口:
type Database interface {
Putter
Deleter
Get(key []byte) ([]byte, error)
Has(key []byte) (bool, error)
Close()
NewBatch() Batch
}
Batch接口。用于批量操作,不能用于并发
type Batch interface {
Putter
Deleter
ValueSize() int
Write() error
Reset()
}
上面两个结构又包含了Putter和Deleter接口
type Putter interface {
Put(key []byte, value []byte) error
}
type Deleter interface {
Delete(key []byte) error
}
database.go
构造
这就是ethereum所使用的代码,先看结构体
type LDBDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance
compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter metrics.Meter // Meter for measuring the data read during compaction
compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
log log.Logger // Contextual logger tracking the database path
}
NewLDBDatabase
成员很多,关键的就是一个leveldb.DB对象和几个同步操作的成员。接下来看新建数据库的方法
func NewLDBDatabase(file string, cache int, handles int) (*LDBDatabase, error) {
logger := log.New("database", file)
//从init调用过来时,cache和handles都为0
//默认启动节点,cache为512,handles为0,在node.DefaultConfig配置
if cache < 16 {
cache = 16
}
if handles < 16 {
handles = 16
}
logger.Info("Allocated cache and file handles", "cache", common.StorageSize(cache*1024*1024), "handles", handles)
db, err := leveldb.OpenFile(file, &opt.Options{
OpenFilesCacheCapacity: handles,
BlockCacheCapacity: cache / 2 * opt.MiB,
WriteBuffer: cache / 4 * opt.MiB,
Filter: filter.NewBloomFilter(10),
})
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
db, err = leveldb.RecoverFile(file, nil)
}
if err != nil {
return nil, err
}
return &LDBDatabase{
fn: file,
db: db,
log: logger,
}, nil
}
逻辑很简单,创建了一个levelDB对象,主要看一下options的内容:
- OpenFilesCacheCapacity:定义打开的文件缓存大小,默认是500,设为-1或0时表示不缓存
- BlockCacheCapacity:定义了一个名为sorted table的缓存容量,默认是8MB,设为-1或0时表示不缓存
- WriteBuffer:定义memdb的大小,它是一个内存数据库,默认是4MB.
- Filter:定义过滤器,优化读性能。这是使用了一个布隆过滤器
put,has,get,delete
func (db *LDBDatabase) Put(key []byte, value []byte) error {
return db.db.Put(key, value, nil)
}
func (db *LDBDatabase) Has(key []byte) (bool, error) {
return db.db.Has(key, nil)
}
func (db *LDBDatabase) Get(key []byte) ([]byte, error) {
dat, err := db.db.Get(key, nil)
if err != nil {
return nil, err
}
return dat, nil
}
func (db *LDBDatabase) Delete(key []byte) error {
return db.db.Delete(key, nil)
}
都是对leveldb做了封装而已。
Batch及其操作
批量读写的封装
type ldbBatch struct {
db *leveldb.DB
b *leveldb.Batch
size int
}
put、delete、write、reset
func (b *ldbBatch) Put(key, value []byte) error {
b.b.Put(key, value)
b.size += len(value)
return nil
}
func (b *ldbBatch) Delete(key []byte) error {
b.b.Delete(key)
b.size += 1
return nil
}
func (b *ldbBatch) Write() error {
return b.db.Write(b.b, nil)
}
func (b *ldbBatch) Reset() {
b.b.Reset()
b.size = 0
}
Meter
这是用于初始化LDBDatabase的一系列Meter成员用的
func (db *LDBDatabase) Meter(prefix string) {
db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
db.quitLock.Lock()
db.quitChan = make(chan chan error)
db.quitLock.Unlock()
go db.meter(3 * time.Second)
}
这个方法内先初始化各种Meter,然后创建了一个chan,最后启动一个goroutine运行meter,之后每3秒收集一次信息并反馈到Meter。这一段代码比较长,就不贴出来了。主要是利用db.db.GetProperty(“leveldb.stats”)获取信息,信息格式如下:
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
// -------+------------+---------------+---------------+---------------+---------------
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
下面就是解析这个字符串并写入Meter。另外一点这段代周期循环的关键代码如下
for i := 1; errc == nil && merr == nil; i++ {
//....
select {
case errc = <-db.quitChan:
case <-time.After(refresh):
}
}
到select出会阻塞,3秒后进行下一次循环,退出时向Meter方法中初始化的chan发送信息,导致errc不为nil,就自然退出循环
close
func (db *LDBDatabase) Close() {
db.quitLock.Lock()
defer db.quitLock.Unlock()
if db.quitChan != nil {
errc := make(chan error)
db.quitChan <- errc
if err := <-errc; err != nil {
db.log.Error("Metrics collection failed", "err", err)
}
db.quitChan = nil
}
err := db.db.Close()
if err == nil {
db.log.Info("Database closed")
} else {
db.log.Error("Failed to close database", "err", err)
}
}
退出代码也很简单,主要就是在加锁环境下,向quitChan写入信息,停止meter,然后等待反馈(在meter发出反馈),最后关闭数据库。
memory_database.go
这是一个用于测试的基于内存的数据库。在源码中主要是在geth初始化时,如果最后创建数据库时依旧没有有效的datadir则使用这个数据库代替
构造
type MemDatabase struct {
db map[string][]byte
lock sync.RWMutex
}
func NewMemDatabase() *MemDatabase {
return &MemDatabase{
db: make(map[string][]byte),
}
}
可见就是基于map的封装。
基础操作
func (db *MemDatabase) Put(key []byte, value []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
db.db[string(key)] = common.CopyBytes(value)
return nil
}
func (db *MemDatabase) Has(key []byte) (bool, error) {
db.lock.RLock()
defer db.lock.RUnlock()
_, ok := db.db[string(key)]
return ok, nil
}
func (db *MemDatabase) Get(key []byte) ([]byte, error) {
db.lock.RLock()
defer db.lock.RUnlock()
if entry, ok := db.db[string(key)]; ok {
return common.CopyBytes(entry), nil
}
return nil, errors.New("not found")
}
func (db *MemDatabase) Delete(key []byte) error {
db.lock.Lock()
defer db.lock.Unlock()
delete(db.db, string(key))
return nil
}
全是基于map操作,只不过进行了加锁
Batch
type memBatch struct {
db *MemDatabase
writes []kv
size int
}
type kv struct {
k, v []byte
del bool
}
func (db *MemDatabase) NewBatch() Batch {
return &memBatch{db: db}
}
func (b *memBatch) Put(key, value []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), common.CopyBytes(value), false})
b.size += len(value)
return nil
}
func (b *memBatch) Delete(key []byte) error {
b.writes = append(b.writes, kv{common.CopyBytes(key), nil, true})
b.size += 1
return nil
}
func (b *memBatch) Write() error {
b.db.lock.Lock()
defer b.db.lock.Unlock()
for _, kv := range b.writes {
if kv.del {
delete(b.db.db, string(kv.k))
continue
}
b.db.db[string(kv.k)] = kv.v
}
return nil
}
func (b *memBatch) Reset() {
b.writes = b.writes[:0]
b.size = 0
}
批量操作先存储在一个KV类型的数组内,等到写入时遍历那个数组,依次存入数据库
table.go与table_batch.go
这两个也是对数据的封装,之所以叫table是因为实例化一个table时要指定一个前缀,之后利用table的基本操作都会给key添加指定的前缀。table_batch也类似,直接看一下table.go的源码
func NewTable(db Database, prefix string) Database {
return &table{
db: db,
prefix: prefix,
}
}
func (dt *table) Put(key []byte, value []byte) error {
return dt.db.Put(append([]byte(dt.prefix), key...), value)
}
func (dt *table) Has(key []byte) (bool, error) {
return dt.db.Has(append([]byte(dt.prefix), key...))
}
func (dt *table) Get(key []byte) ([]byte, error) {
return dt.db.Get(append([]byte(dt.prefix), key...))
}
func (dt *table) Delete(key []byte) error {
return dt.db.Delete(append([]byte(dt.prefix), key...))
}