rlp编码学习

RLP的全称是Recursive Length Prefix,是以太坊实现中普遍使用的一种序列化方法,在黄皮书的附录B中有详细的定义,我们这里也简要学习一下

源数据

RLP定义了两种源数据,一种是一维的字节数组;另外一种是多维字节数组,也就是一维数组的嵌套。所有要序列化的数据类型,都要有一定的方法转为上述两种格式,转换的方法可以根据不同的实现自己定义。在黄皮书中给出了源数据的定义:

RLP定义

函数定义如下:

可见分别定义了两个函数,对应了上一节中的两种源数据,分别解释一下这两个函数

源数据为一维字节数组

当数据为简单的一维字节数组时,有以下三种序列化规则:

  1. 当只含有一个字节时,而且这个字节又小于128,则不做任何处理,直接输出,对应上图第一种情况
  2. 当字节数组的长度小于56时,则加上一个前缀,这个前缀等于128+字节数组长度,对应上图第二种情况
  3. 若不符合上述两种情况,则加上这样两个前缀,第一个前缀等于183+字节数组长度在大端表示时的长度,第二个前缀为字节数组长度的大端表示,对应上图第三种情况(所谓大端表示就是将高位字节排放在内存的低地址端,0x1234表示为00 00 12 34,BE函数就是去除前面的零,也可以理解为实际长度)

源数据为嵌套的多维字节数组

当数据为嵌套的多维数组形式时,有以下两种序列化规则:

首先对数组中每一个子元素都递归使用上一小节中的规则序列化,注意序列化时对象都要为一维字节数组,若子元素也是嵌套格式,则递归调用,之后将每个子元素序列化结果拼接起来。对于拼接后的长度:

  1. 若长度小于56,则加上这样一个前缀,这个前缀等于192+拼接后的长度,对应上图第一种情况
  2. 若长度大于等于56,则加上这样两个前缀,第一个前缀等于247+拼接后字节数组长度在大端表示时的长度,第二个前缀为拼接后的长度的大端表示,对应上图第二种情况

源数据是标量数据

首先RLP只能用于处理正整数,处理是要先用BE函数处理,去掉前导0后,当做字节数组处理,如下图:

解码

实际上了解到编码规则后,解码就很简单,关键就是第一个字节,这个字节标识使用哪种情况编码

  1. 当位于[0,128)区间时,对应源数据是一维字节数组的第一种情况,就是单一字节
  2. 当位于[128,184)区间时,对应源数据是一维字节数组的第二种情况,就是长度小于56的一维字节数组
  3. 当位于[184,192)区间时,对应源数据是一维字节数组的第三种情况,这时观察第二个前缀,第二个前缀长度等于第一个字节减去183,然后计算原始数据的真正长度
  4. 当位于[192,247)区间时,对于源数据是多维字节数组的第一种情况,就是拼接长度小于56,递归解析其后数据
  5. 当位于[247,256)区间时,对于源数据是多维字节数组的第二种情况,类似于第三条规则,先实际计算出第二个前缀的长度,在解析数原始数据长度,在递归解析出原始数据

源码解析

源码主要集中在go-ethereum\rlp目录下,再去除一些测试代码,实际功能代码并不多,关键如下:

decode.go       //解码器,就是反序列化
encode.go       //编码器,就是序列化
raw.go          //未解码的RLP数据
typecache.go	//类型缓存, 类型缓存记录了类型->(编码器|解码器)的内容。

typecache

由于go-ethereum是用go语言实现的,而go语言没有方法重载,所以对于不同类型的数据要手动指定需要的编解码器。这个类主要功能是给我们返回一个typeinfo类型的对象,这个对象保存在对应数据类型的编解码方法

type typeinfo struct {
	decoder
	writer
}

去创建一个typeinfo需要从cachedTypeInfo方法开始:

// go-ethereum\rlp\typecache.go
func cachedTypeInfo(typ reflect.Type, tags tags) (*typeinfo, error) {
	typeCacheMutex.RLock()
	info := typeCache[typekey{typ, tags}] //尝试从缓存中国区
	typeCacheMutex.RUnlock()
	if info != nil { //获取成功
		return info, nil
	}
	typeCacheMutex.Lock() //加锁,避免多线程多次创建
	defer typeCacheMutex.Unlock()
	return cachedTypeInfo1(typ, tags)
}

func cachedTypeInfo1(typ reflect.Type, tags tags) (*typeinfo, error) {
	key := typekey{typ, tags}
	info := typeCache[key]//再次尝试获取,确保只创建一次
	if info != nil { //获取成功
		return info, nil
	}
	typeCache[key] = new(typeinfo) //创建一个空对象
	info, err := genTypeInfo(typ, tags) //实际创建对象
	if err != nil { //创建失败
		delete(typeCache, key)
		return nil, err
	}
	*typeCache[key] = *info //存储到map中
	return typeCache[key], err
}
func genTypeInfo(typ reflect.Type, tags tags) (info *typeinfo, err error) {
	info = new(typeinfo)
	if info.decoder, err = makeDecoder(typ, tags); err != nil {
		return nil, err
	}
	if info.writer, err = makeWriter(typ, tags); err != nil {
		return nil, err
	}
	return info, nil
}

可见对每种类型,都是单例模式。上述代码中实际创建编解码器的方法是makeDecoder和makeWriter。这两个方法详见下文

encode

对于编码器的使用,一般调用Encode函数:

func Encode(w io.Writer, val interface{}) error {
	if outer, ok := w.(*encbuf); ok {//判断是否是encbuf类型的writer
		return outer.encode(val)
	}
	eb := encbufPool.Get().(*encbuf) //从并发变量池中获取一个encbuf对象
	defer encbufPool.Put(eb)
	eb.reset() //清空原有数据
	if err := eb.encode(val); err != nil { //编码
		return err
	}
	return eb.toWriter(w)
}

编码的核心操作在encbuf的encode方法:

func (w *encbuf) encode(val interface{}) error {
	rval := reflect.ValueOf(val)
	ti, err := cachedTypeInfo(rval.Type(), tags{})
	if err != nil {
		return err
	}
	return ti.writer(rval, w)
}

这里就接上了上一节typecache中的方法,这里通过makeWriter确定编码器

func makeWriter(typ reflect.Type, ts tags) (writer, error) {
	kind := typ.Kind()
	switch {
	case typ == rawValueType:
		return writeRawValue, nil
	case typ.Implements(encoderInterface):
		return writeEncoder, nil
	case kind != reflect.Ptr && reflect.PtrTo(typ).Implements(encoderInterface):
		return writeEncoderNoPtr, nil
	case kind == reflect.Interface:
		return writeInterface, nil
	case typ.AssignableTo(reflect.PtrTo(bigInt)):
		return writeBigIntPtr, nil
	case typ.AssignableTo(bigInt):
		return writeBigIntNoPtr, nil
	case isUint(kind):
		return writeUint, nil
	case kind == reflect.Bool:
		return writeBool, nil
	case kind == reflect.String:
		return writeString, nil
	case kind == reflect.Slice && isByte(typ.Elem()):
		return writeBytes, nil
	case kind == reflect.Array && isByte(typ.Elem()):
		return writeByteArray, nil
	case kind == reflect.Slice || kind == reflect.Array:
		return makeSliceWriter(typ, ts)
	case kind == reflect.Struct:
		return makeStructWriter(typ)
	case kind == reflect.Ptr:
		return makePtrWriter(typ)
	default:
		return nil, fmt.Errorf("rlp: type %v is not RLP-serializable", typ)
	}
}

就是简单的根据不同的类型创建不同的编码方法,以string为例:

func writeString(val reflect.Value, w *encbuf) error {
	s := val.String()
	if len(s) == 1 && s[0] <= 0x7f {//只有一个字符,且小于128
		w.str = append(w.str, s[0])
	} else {
		w.encodeStringHeader(len(s))//添加前缀
		w.str = append(w.str, s...)
	}
	return nil
}

func (w *encbuf) encodeStringHeader(size int) {
	if size < 56 { //长度小于56
		w.str = append(w.str, 0x80+byte(size))//前缀是128+长度
	} else { //其他情况
		sizesize := putint(w.sizebuf[1:], uint64(size)) //将长度的大端表示写入sizebuf
		w.sizebuf[0] = 0xB7 + byte(sizesize) //第一个前缀183+字符串长度在大端表示后的长度
		w.str = append(w.str, w.sizebuf[:sizesize+1]...)//拼接第二个前缀,字符串长度的大端表示
	}
}

详细过程见注释,基本过程和RLP定义的一样。对于结构体可能有些特殊:

func makeStructWriter(typ reflect.Type) (writer, error) {
	fields, err := structFields(typ) //分析结构体每个字段,根据情况指定每个字段的编码方法
	if err != nil {
		return nil, err
	}
	writer := func(val reflect.Value, w *encbuf) error { //编码器具体方法
		lh := w.list()
		for _, f := range fields {
			if err := f.info.writer(val.Field(f.index), w); err != nil {
				return err
			}
		}
		w.listEnd(lh)
		return nil
	}
	return writer, nil
}

// go-ethereum\rlp\typecache.go
func structFields(typ reflect.Type) (fields []field, err error) {
	for i := 0; i < typ.NumField(); i++ {
		if f := typ.Field(i); f.PkgPath == "" { // 若果是可导出的,也就是首字母大写,PkgPath为空,不可导出会返回包名
			tags, err := parseStructTag(typ, i)//解析每个字段的标签,就是字段后``中定义的
			if err != nil {
				return nil, err
			}
			if tags.ignored {
				continue
			}
			info, err := cachedTypeInfo1(f.Type, tags)//根据类型获取cachedTypeInfo,包含编解码器
			if err != nil {
				return nil, err
			}
			fields = append(fields, field{i, info})
		}
	}
	return fields, nil
}

func parseStructTag(typ reflect.Type, fi int) (tags, error) {
	f := typ.Field(fi)
	var ts tags
	for _, t := range strings.Split(f.Tag.Get("rlp"), ",") {
		switch t = strings.TrimSpace(t); t {
		case "":
		case "-":
			ts.ignored = true
		case "nil":
			ts.nilOK = true
		case "tail":
			ts.tail = true
			if fi != typ.NumField()-1 {
				return ts, fmt.Errorf(`rlp: invalid struct tag "tail" for %v.%s (must be on last field)`, typ, f.Name)
			}
			if f.Type.Kind() != reflect.Slice {
				return ts, fmt.Errorf(`rlp: invalid struct tag "tail" for %v.%s (field type is not slice)`, typ, f.Name)
			}
		default:
			return ts, fmt.Errorf("rlp: unknown struct tag %q on %v.%s", t, typ, f.Name)
		}
	}
	return ts, nil
}

结构体类型的虽然复杂,但也是具体到每个字段执行不同的序列化方法,最后进行拼接。

decode

对于解码器一般调用Decode函数:

func Decode(r io.Reader, val interface{}) error {
	return NewStream(r, 0).Decode(val)
}
func (s *Stream) Decode(val interface{}) error {
	if val == nil {
		return errDecodeIntoNil
	}
	rval := reflect.ValueOf(val)
	rtyp := rval.Type()
	if rtyp.Kind() != reflect.Ptr { //判断是否为指针类型
		return errNoPointer
	}
	if rval.IsNil() {
		return errDecodeIntoNil
	}
	info, err := cachedTypeInfo(rtyp.Elem(), tags{}) //获取编解码方法
	if err != nil {
		return err
	}

	err = info.decoder(s, rval.Elem())//解码
	if decErr, ok := err.(*decodeError); ok && len(decErr.ctx) > 0 {
		decErr.ctx = append(decErr.ctx, fmt.Sprint("(", rtyp.Elem(), ")"))
	}
	return err
}

注意decode的逻辑是从Stream中获取源数据,最后解析到val中,所以需要val是一个指针类型,接下来又回到typecache中,通过判断val的类型获取所需的解码器。

func makeDecoder(typ reflect.Type, tags tags) (dec decoder, err error) {
	kind := typ.Kind()
	switch {
	case typ == rawValueType:
		return decodeRawValue, nil
	case typ.Implements(decoderInterface):
		return decodeDecoder, nil
	case kind != reflect.Ptr && reflect.PtrTo(typ).Implements(decoderInterface):
		return decodeDecoderNoPtr, nil
	case typ.AssignableTo(reflect.PtrTo(bigInt)):
		return decodeBigInt, nil
	case typ.AssignableTo(bigInt):
		return decodeBigIntNoPtr, nil
	case isUint(kind):
		return decodeUint, nil
	case kind == reflect.Bool:
		return decodeBool, nil
	case kind == reflect.String:
		return decodeString, nil
	case kind == reflect.Slice || kind == reflect.Array:
		return makeListDecoder(typ, tags)
	case kind == reflect.Struct:
		return makeStructDecoder(typ)
	case kind == reflect.Ptr:
		if tags.nilOK {
			return makeOptionalPtrDecoder(typ)
		}
		return makePtrDecoder(typ)
	case kind == reflect.Interface:
		return decodeInterface, nil
	default:
		return nil, fmt.Errorf("rlp: type %v is not RLP-serializable", typ)
	}
}

以string为例:

func decodeString(s *Stream, val reflect.Value) error {
	b, err := s.Bytes()
	if err != nil {
		return wrapStreamError(err, val.Type())
	}
	val.SetString(string(b)) //还原为string
	return nil
}
func (s *Stream) Bytes() ([]byte, error) {
	kind, size, err := s.Kind()
	if err != nil {
		return nil, err
	}
	switch kind {
	case Byte:
		s.kind = -1 
		return []byte{s.byteval}, nil
	case String:
		b := make([]byte, size) //根据数据长度指定byte数组
		if err = s.readFull(b); err != nil { //读取原始数据
			return nil, err
		}
		if size == 1 && b[0] < 128 {
			return nil, ErrCanonSize
		}
		return b, nil
	default:
		return nil, ErrExpectedString
	}
}
func (s *Stream) Kind() (kind Kind, size uint64, err error) {
	var tos *listpos
	if len(s.stack) > 0 {
		tos = &s.stack[len(s.stack)-1]
	}
	if s.kind < 0 {
		s.kinderr = nil
		if tos != nil && tos.pos == tos.size {
			return 0, 0, EOL
		}
		s.kind, s.size, s.kinderr = s.readKind() //读取原始数据类型以及长度
		if s.kinderr == nil {
			if tos == nil {
				if s.limited && s.size > s.remaining {
					s.kinderr = ErrValueTooLarge
				}
			} else {
				if s.size > tos.size-tos.pos {
					s.kinderr = ErrElemTooLarge
				}
			}
		}
	}
	return s.kind, s.size, s.kinderr
}
func (s *Stream) readKind() (kind Kind, size uint64, err error) {
	b, err := s.readByte() //读第一个byte
	if err != nil {
		if len(s.stack) == 0 {
			switch err {
			case io.ErrUnexpectedEOF:
				err = io.EOF
			case ErrValueTooLarge:
				err = io.EOF
			}
		}
		return 0, 0, err
	}
	s.byteval = 0
	switch { //根据第一字节的只判断原始数据类型
	case b < 0x80: //原始数据只有一个byte,且小于128
		s.byteval = b
		return Byte, 0, nil
	case b < 0xB8: //原始数据长度小于56
	    //返回的第二个数据获得原始数据长度
		return String, uint64(b - 0x80), nil
	case b < 0xC0:
		size, err = s.readUint(b - 0xB7)
		if err == nil && size < 56 {
			err = ErrCanonSize
		}
		return String, size, err
	case b < 0xF8:
		return List, uint64(b - 0xC0), nil
	default:
		size, err = s.readUint(b - 0xF7)
		if err == nil && size < 56 {
			err = ErrCanonSize
		}
		return List, size, err
	}
}

可见流程就是rlp编码的逆向过程,和我们之前讲的解码方法一样,关键是通过第一个字节获取原始数据的类型,然后推算出原始数据的长度,最后解析即可

小结

最后,go-ethereum源码中rlp实现部分还是很完整的,并且没有什么其他依赖,都使用的是go标准包,所以可以单独拿出来做一个库,以后遇到需要用rlp编码的地方,可以直接拿来使用。还有这一部分源码大量的使用了反射,对于学习go语言反射也是很好的一个素材