Skip to content

Commit

Permalink
fix: fix buf reader
Browse files Browse the repository at this point in the history
  • Loading branch information
HeyJavaBean committed Nov 28, 2024
1 parent 649294d commit a6263c4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 9 deletions.
24 changes: 18 additions & 6 deletions protocol/thrift/apache/adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func AdaptRead(p, iprot interface{}) error {
return fmt.Errorf("no codec implementation available")
}

var rd io.Reader
var br bufiox.Reader
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader
if bp, ok := iprot.(bufioxReaderWriter); ok {
Expand All @@ -58,29 +59,40 @@ func AdaptRead(p, iprot interface{}) error {
case byteBuffer:
// if reader is from byteBuffer, Read() function is not always available
// so use an adaptor to implement Read() by Next() and ReadableLen()
br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r))
rd = byteBuffer2ReadWriter(r)
case io.ReadWriter:
// if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol
br = bufiox.NewDefaultReader(r)
// if reader is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol
rd = r
default:
return fmt.Errorf("reader not ok")
}
break
}
}
}
if br == nil {
if rd == nil && br == nil {
return fmt.Errorf("no available field for reader")
}

// read data from iprot
buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT)
var sd *thrift.SkipDecoder
if br != nil {
sd = thrift.NewSkipDecoder(br)
} else {
// if there's no bufiox.Reader, do not wrap a new bufiox.Reader, or some data will remain in the buffer
// directly read from io.Reader
sd = thrift.NewSkipDecoderWithIOReader(rd)
}

buf, err := sd.Next(thrift.STRUCT)
if err != nil {
return err
}

sd.Release()

// unmarshal the data into struct
_, err = fastStruct.FastRead(buf)

return err
}

Expand Down
27 changes: 24 additions & 3 deletions protocol/thrift/skipdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package thrift
import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/bytedance/gopkg/lang/mcache"
Expand All @@ -35,7 +36,8 @@ var poolSkipDecoder = sync.Pool{

// SkipDecoder scans the underlying io.Reader and returns the bytes of a type
type SkipDecoder struct {
r bufiox.Reader
r bufiox.Reader
ioReader io.Reader

// for storing Next(ttype) buffer
nextBuf []byte
Expand All @@ -51,6 +53,15 @@ func NewSkipDecoder(r bufiox.Reader) *SkipDecoder {
return p
}

// NewSkipDecoderWithIOReader creates a skip decoder with given io.Reader ... call Release if no longer use
// Use this if you don't want the bufiox.Reader automatically restore some bytes in the buffer.
// The performance may have some loss when the input ioReader don't implement the read buffer logic.
func NewSkipDecoderWithIOReader(rd io.Reader) *SkipDecoder {
p := poolSkipDecoder.Get().(*SkipDecoder)
p.ioReader = rd
return p
}

// Release releases the peekAck decoder, callers cannot use the returned data of Next after calling Release.
func (p *SkipDecoder) Release() {
if cap(p.nextBuf) > 0 {
Expand Down Expand Up @@ -162,9 +173,19 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error {
}

func (p *SkipDecoder) next(n int) (buf []byte, err error) {
if buf, err = p.r.Next(n); err != nil {
return
// if SkipDecoder is created with ioReader, just read from ioReader.
if p.r == nil {
buf = make([]byte, n)
if _, err = p.ioReader.Read(buf); err != nil {
return
}
} else {
// read from bufiox.Reader
if buf, err = p.r.Next(n); err != nil {
return
}
}

if cap(p.nextBuf)-len(p.nextBuf) < n {
var ncap int
for ncap = cap(p.nextBuf) * 2; ncap-len(p.nextBuf) < n; ncap *= 2 {
Expand Down

0 comments on commit a6263c4

Please # to comment.