From 9c415f239b6ec5335e4419f649d438bad97b6cc3 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Fri, 11 Oct 2024 15:10:20 +0800 Subject: [PATCH 01/12] feat: implement an apache adaptor --- bridge/apache_bridge.go | 97 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 bridge/apache_bridge.go diff --git a/bridge/apache_bridge.go b/bridge/apache_bridge.go new file mode 100644 index 0000000..3505736 --- /dev/null +++ b/bridge/apache_bridge.go @@ -0,0 +1,97 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bridge + +import ( + "fmt" + "io" + "reflect" + "unsafe" + + "github.com/cloudwego/gopkg/bufiox" + "github.com/cloudwego/gopkg/protocol/thrift" +) + +func ApacheReadBridge(iprot interface{}, readFunc func(b []byte) (int, error)) error { + var br bufiox.Reader + fieldNames := []string{"br", "trans"} + for _, fn := range fieldNames { + reader, exist, err := getUnexportField(iprot, fn) + if err != nil { + return err + } + if exist { + switch r := reader.(type) { + case bufiox.Reader: + br = r + case io.Reader: + br = bufiox.NewDefaultReader(r) + default: + return fmt.Errorf("reader not ok") + } + break + } + } + if br == nil { + return fmt.Errorf("no available field for reader") + } + buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT) + if err != nil { + return err + } + _, err = readFunc(buf) + return err +} + +func ApacheWriteBridge(oprot interface{}, bufFunc func() []byte) error { + var bw bufiox.Writer + fieldNames := []string{"bw", "trans"} + for _, fn := range fieldNames { + writer, exist, err := getUnexportField(oprot, fn) + if err != nil { + return err + } + if exist { + switch w := writer.(type) { + case bufiox.Writer: + bw = w + case io.Writer: + bw = bufiox.NewDefaultWriter(w) + default: + return fmt.Errorf("writer type not ok") + } + break + } + } + if bw == nil { + return fmt.Errorf("no available field for writer") + } + _, err := bw.WriteBinary(bufFunc()) + if err != nil { + return err + } + return bw.Flush() +} + +func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bool, error error) { + if reflect.TypeOf(p).Kind() != reflect.Ptr { + return nil, false, fmt.Errorf("%s is not a ptr", p) + } + field := reflect.ValueOf(p).Elem().FieldByName(fieldName) + if field.IsValid() { + return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface(), true, nil + } + return nil, false, nil +} From a4e210f583d37822edd4e2f7b4bed651faf6491e Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 20 Nov 2024 16:12:34 +0800 Subject: [PATCH 02/12] fix: fix reader to next --- .../adaptor.go | 55 +++++++++++++++++-- 1 file changed, 49 insertions(+), 6 deletions(-) rename bridge/apache_bridge.go => apache_adaptor/adaptor.go (64%) diff --git a/bridge/apache_bridge.go b/apache_adaptor/adaptor.go similarity index 64% rename from bridge/apache_bridge.go rename to apache_adaptor/adaptor.go index 3505736..8703bfa 100644 --- a/bridge/apache_bridge.go +++ b/apache_adaptor/adaptor.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package bridge +package apache_adaptor import ( "fmt" @@ -24,7 +24,46 @@ import ( "github.com/cloudwego/gopkg/protocol/thrift" ) -func ApacheReadBridge(iprot interface{}, readFunc func(b []byte) (int, error)) error { +type ByteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) +} + +type nextReader struct { + nx ByteBuffer +} + +func (nr nextReader) Read(p []byte) (n int, err error) { + readable := nr.nx.ReadableLen() + if readable == -1 { + return 0, err + } + if readable > len(p) { + readable = len(p) + } + data, err := nr.nx.Next(readable) + if err != nil { + return -1, err + } + copy(p, data) + return readable, nil +} + +func next2Reader(n ByteBuffer) io.Reader { + return &nextReader{nx: n} +} + +func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { + // 通过过渡手段先让用户的 Apache Codec 变成冷门路径 + + // todo + // 先给 kitex 新版本 TProtocol 补全接口 + // 尝试类型断言(对下一个新版本有效) + var br bufiox.Reader fieldNames := []string{"br", "trans"} for _, fn := range fieldNames { @@ -36,8 +75,11 @@ func ApacheReadBridge(iprot interface{}, readFunc func(b []byte) (int, error)) e switch r := reader.(type) { case bufiox.Reader: br = r - case io.Reader: - br = bufiox.NewDefaultReader(r) + // case io.Reader: + // br = bufiox.NewDefaultReader(r) + case ByteBuffer: + rd := next2Reader(r) + br = bufiox.NewDefaultReader(rd) default: return fmt.Errorf("reader not ok") } @@ -55,7 +97,7 @@ func ApacheReadBridge(iprot interface{}, readFunc func(b []byte) (int, error)) e return err } -func ApacheWriteBridge(oprot interface{}, bufFunc func() []byte) error { +func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { var bw bufiox.Writer fieldNames := []string{"bw", "trans"} for _, fn := range fieldNames { @@ -78,7 +120,8 @@ func ApacheWriteBridge(oprot interface{}, bufFunc func() []byte) error { if bw == nil { return fmt.Errorf("no available field for writer") } - _, err := bw.WriteBinary(bufFunc()) + buf := writeFunc() + _, err := bw.WriteBinary(buf) if err != nil { return err } From 7ad66728a066663526639b7cd2c09e02592e8aed Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Thu, 21 Nov 2024 16:20:24 +0800 Subject: [PATCH 03/12] optimize: add interface for new kitex version --- apache_adaptor/adaptor.go | 83 +++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/apache_adaptor/adaptor.go b/apache_adaptor/adaptor.go index 8703bfa..1ca629d 100644 --- a/apache_adaptor/adaptor.go +++ b/apache_adaptor/adaptor.go @@ -24,6 +24,11 @@ import ( "github.com/cloudwego/gopkg/protocol/thrift" ) +type bufioxReaderWriter interface { + GetBufioxReader() bufiox.Reader + GetBufioxWriter() bufiox.Writer +} + type ByteBuffer interface { // Next reads the next n bytes sequentially and returns the original buffer. Next(n int) (p []byte, err error) @@ -58,32 +63,28 @@ func next2Reader(n ByteBuffer) io.Reader { } func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { - // 通过过渡手段先让用户的 Apache Codec 变成冷门路径 - - // todo - // 先给 kitex 新版本 TProtocol 补全接口 - // 尝试类型断言(对下一个新版本有效) - var br bufiox.Reader - fieldNames := []string{"br", "trans"} - for _, fn := range fieldNames { - reader, exist, err := getUnexportField(iprot, fn) - if err != nil { - return err - } - if exist { - switch r := reader.(type) { - case bufiox.Reader: - br = r - // case io.Reader: - // br = bufiox.NewDefaultReader(r) - case ByteBuffer: - rd := next2Reader(r) - br = bufiox.NewDefaultReader(rd) - default: - return fmt.Errorf("reader not ok") + if bp, ok := iprot.(bufioxReaderWriter); ok { + br = bp.GetBufioxReader() + } else { + fieldNames := []string{"br", "trans"} + for _, fn := range fieldNames { + reader, exist, err := getUnexportField(iprot, fn) + if err != nil { + return err + } + if exist { + switch r := reader.(type) { + case bufiox.Reader: + br = r + case ByteBuffer: + rd := next2Reader(r) + br = bufiox.NewDefaultReader(rd) + default: + return fmt.Errorf("reader not ok") + } + break } - break } } if br == nil { @@ -99,22 +100,26 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { var bw bufiox.Writer - fieldNames := []string{"bw", "trans"} - for _, fn := range fieldNames { - writer, exist, err := getUnexportField(oprot, fn) - if err != nil { - return err - } - if exist { - switch w := writer.(type) { - case bufiox.Writer: - bw = w - case io.Writer: - bw = bufiox.NewDefaultWriter(w) - default: - return fmt.Errorf("writer type not ok") + if bp, ok := oprot.(bufioxReaderWriter); ok { + bw = bp.GetBufioxWriter() + } else { + fieldNames := []string{"bw", "trans"} + for _, fn := range fieldNames { + writer, exist, err := getUnexportField(oprot, fn) + if err != nil { + return err + } + if exist { + switch w := writer.(type) { + case bufiox.Writer: + bw = w + case io.Writer: + bw = bufiox.NewDefaultWriter(w) + default: + return fmt.Errorf("writer type not ok") + } + break } - break } } if bw == nil { From 35af4ef7291a5d62f1d0f2da7c62f3fcceab9989 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Fri, 22 Nov 2024 15:04:25 +0800 Subject: [PATCH 04/12] chore: add comment --- apache_adaptor/adaptor.go | 95 +++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/apache_adaptor/adaptor.go b/apache_adaptor/adaptor.go index 1ca629d..450670b 100644 --- a/apache_adaptor/adaptor.go +++ b/apache_adaptor/adaptor.go @@ -24,49 +24,16 @@ import ( "github.com/cloudwego/gopkg/protocol/thrift" ) -type bufioxReaderWriter interface { - GetBufioxReader() bufiox.Reader - GetBufioxWriter() bufiox.Writer -} - -type ByteBuffer interface { - // Next reads the next n bytes sequentially and returns the original buffer. - Next(n int) (p []byte, err error) - - // ReadableLen returns the total length of readable buffer. - // Return: -1 means unreadable. - ReadableLen() (n int) -} - -type nextReader struct { - nx ByteBuffer -} - -func (nr nextReader) Read(p []byte) (n int, err error) { - readable := nr.nx.ReadableLen() - if readable == -1 { - return 0, err - } - if readable > len(p) { - readable = len(p) - } - data, err := nr.nx.Next(readable) - if err != nil { - return -1, err - } - copy(p, data) - return readable, nil -} - -func next2Reader(n ByteBuffer) io.Reader { - return &nextReader{nx: n} -} - +// AdaptRead receive a kitex binary protocol and read it by given function. func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { var br bufiox.Reader + // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { br = bp.GetBufioxReader() } else { + // if iprot is from kitex version lower than v0.12.0, use reflection to get reader + // in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader + // in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) fieldNames := []string{"br", "trans"} for _, fn := range fieldNames { reader, exist, err := getUnexportField(iprot, fn) @@ -77,7 +44,9 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error switch r := reader.(type) { case bufiox.Reader: br = r - case ByteBuffer: + case byteBuffer: + // if reader is from byteBuffer, Read() function is not always available + // so use an adaptor to implement Read() by Next() and ReadableLen() rd := next2Reader(r) br = bufiox.NewDefaultReader(rd) default: @@ -98,11 +67,16 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error return err } +// AdaptWrite receive a kitex binary protocol and write it by given function. func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { var bw bufiox.Writer + // if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer if bp, ok := oprot.(bufioxReaderWriter); ok { bw = bp.GetBufioxWriter() } else { + // if iprot is from kitex version lower than v0.12.0, use reflection to get writer + // in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer + // in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer fieldNames := []string{"bw", "trans"} for _, fn := range fieldNames { writer, exist, err := getUnexportField(oprot, fn) @@ -133,6 +107,7 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { return bw.Flush() } +// getUnexportField retrieves the value of an unexported struct field. func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bool, error error) { if reflect.TypeOf(p).Kind() != reflect.Ptr { return nil, false, fmt.Errorf("%s is not a ptr", p) @@ -143,3 +118,45 @@ func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bo } return nil, false, nil } + +// bufioxReaderWriter +type bufioxReaderWriter interface { + GetBufioxReader() bufiox.Reader + GetBufioxWriter() bufiox.Writer +} + +// byteBuffer +type byteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) +} + +// nextReader is an adaptor that implement Read() by Next() and ReadableLen() +type nextReader struct { + nx byteBuffer +} + +// Read reads data from the nextReader's internal buffer into p. +func (nr nextReader) Read(p []byte) (n int, err error) { + readable := nr.nx.ReadableLen() + if readable == -1 { + return 0, err + } + if readable > len(p) { + readable = len(p) + } + data, err := nr.nx.Next(readable) + if err != nil { + return -1, err + } + copy(p, data) + return readable, nil +} + +func next2Reader(n byteBuffer) io.Reader { + return &nextReader{nx: n} +} From 0a95320f7d839c8ad3cfce4b4bc42f57e2c18def Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Tue, 26 Nov 2024 20:45:14 +0800 Subject: [PATCH 05/12] fix: fix some implementations --- apache_adaptor/adaptor.go | 103 +++++++++++++++++---------------- apache_adaptor/byte_buffer.go | 66 +++++++++++++++++++++ apache_adaptor/struct_codec.go | 93 +++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+), 51 deletions(-) create mode 100644 apache_adaptor/byte_buffer.go create mode 100644 apache_adaptor/struct_codec.go diff --git a/apache_adaptor/adaptor.go b/apache_adaptor/adaptor.go index 450670b..d598e17 100644 --- a/apache_adaptor/adaptor.go +++ b/apache_adaptor/adaptor.go @@ -25,15 +25,26 @@ import ( ) // AdaptRead receive a kitex binary protocol and read it by given function. -func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { +func AdaptRead(p, iprot interface{}) error { + // for now, we use fastCodec to adapt apache codec. + // the struct should have the function 'FastRead' + fastStruct, ok := p.(fastReader) + if !ok { + return fmt.Errorf("no codec implementation available") + } + var br bufiox.Reader // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { br = bp.GetBufioxReader() } else { - // if iprot is from kitex version lower than v0.12.0, use reflection to get reader - // in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader - // in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) + // if iprot is from kitex lower than v0.12.0, use reflection to get reader + // 1. in kitex v0.11.0, reader is from the field 'br' which is a bufiox.Reader + // eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 + // 2. in kitex under v0.11.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) + // eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54 + // in apache thrift v0.13.0, reader is from the field 'trans' which implements the interface io.ReadWriter + // eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33 fieldNames := []string{"br", "trans"} for _, fn := range fieldNames { reader, exist, err := getUnexportField(iprot, fn) @@ -46,9 +57,11 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error br = r case byteBuffer: // if reader is from byteBuffer, Read() function is not always available - // so use an adaptor to implement Read() by Next() and ReadableLen() - rd := next2Reader(r) - br = bufiox.NewDefaultReader(rd) + // so use an adaptor to implement Read() by Next() and ReadableLen() + br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r)) + case io.ReadWriter: + // if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol + br = bufiox.NewDefaultReader(r) default: return fmt.Errorf("reader not ok") } @@ -59,24 +72,40 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error if br == nil { return fmt.Errorf("no available field for reader") } + + // read data from iprot buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT) if err != nil { return err } - _, err = readFunc(buf) + + // unmarshal the data into struct + _, err = fastStruct.FastRead(buf) return err } // AdaptWrite receive a kitex binary protocol and write it by given function. -func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { +func AdaptWrite(p, oprot interface{}) error { + // for now, we use fastCodec, the struct should have the function 'FastWrite' + // but in old kitex_gen, the arguments of FastWrite is not from the same package. + // so we use reflection to handle this situation. + fastStruct, err := toFastCodec(p) + if err != nil { + return fmt.Errorf("no codec implementation available:%s", err) + } + var bw bufiox.Writer // if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer if bp, ok := oprot.(bufioxReaderWriter); ok { bw = bp.GetBufioxWriter() } else { - // if iprot is from kitex version lower than v0.12.0, use reflection to get writer - // in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer - // in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer + // if iprot is from kitex lower than v0.12.0, use reflection to get writer + // 1. in kitex v0.11.0, writer is from the field 'bw' which is a bufiox.Writer + // eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 + // 2. in kitex under v0.11.0, writer is from the field 'trans' which is kitex buffer (mostly NetpollByteBuffer) + // eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54 + // in apache thrift v0.13.0, writer is from the field 'trans' which implements the interface io.ReadWriter + // eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33 fieldNames := []string{"bw", "trans"} for _, fn := range fieldNames { writer, exist, err := getUnexportField(oprot, fn) @@ -87,7 +116,12 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { switch w := writer.(type) { case bufiox.Writer: bw = w - case io.Writer: + case byteBuffer: + // if writer is from byteBuffer, Write() function is not always available + // so use an adaptor to implement Write() by Malloc() + bw = bufiox.NewDefaultWriter(byteBuffer2ReadWriter(w)) + case io.ReadWriter: + // if writer is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol bw = bufiox.NewDefaultWriter(w) default: return fmt.Errorf("writer type not ok") @@ -99,8 +133,11 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { if bw == nil { return fmt.Errorf("no available field for writer") } - buf := writeFunc() - _, err := bw.WriteBinary(buf) + + // use fast codec + buf := make([]byte, fastStruct.BLength()) + buf = buf[:fastStruct.FastWriteNocopy(buf, nil)] + _, err = bw.WriteBinary(buf) if err != nil { return err } @@ -124,39 +161,3 @@ type bufioxReaderWriter interface { GetBufioxReader() bufiox.Reader GetBufioxWriter() bufiox.Writer } - -// byteBuffer -type byteBuffer interface { - // Next reads the next n bytes sequentially and returns the original buffer. - Next(n int) (p []byte, err error) - - // ReadableLen returns the total length of readable buffer. - // Return: -1 means unreadable. - ReadableLen() (n int) -} - -// nextReader is an adaptor that implement Read() by Next() and ReadableLen() -type nextReader struct { - nx byteBuffer -} - -// Read reads data from the nextReader's internal buffer into p. -func (nr nextReader) Read(p []byte) (n int, err error) { - readable := nr.nx.ReadableLen() - if readable == -1 { - return 0, err - } - if readable > len(p) { - readable = len(p) - } - data, err := nr.nx.Next(readable) - if err != nil { - return -1, err - } - copy(p, data) - return readable, nil -} - -func next2Reader(n byteBuffer) io.Reader { - return &nextReader{nx: n} -} diff --git a/apache_adaptor/byte_buffer.go b/apache_adaptor/byte_buffer.go new file mode 100644 index 0000000..4b8ef14 --- /dev/null +++ b/apache_adaptor/byte_buffer.go @@ -0,0 +1,66 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apache_adaptor + +import "io" + +// byteBuffer +type byteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) + + // Malloc n bytes sequentially in the writer buffer. + Malloc(n int) (buf []byte, err error) +} + +// byteBufferWrapper is an adaptor that implement Read() by Next() and ReadableLen() and implement Write() by Malloc() +type byteBufferWrapper struct { + b byteBuffer +} + +func byteBuffer2ReadWriter(n byteBuffer) io.ReadWriter { + return &byteBufferWrapper{b: n} +} + +// Read reads data from the byteBufferWrapper's internal buffer into p. +func (bw byteBufferWrapper) Read(p []byte) (n int, err error) { + readable := bw.b.ReadableLen() + if readable == -1 { + return 0, err + } + if readable > len(p) { + readable = len(p) + } + data, err := bw.b.Next(readable) + if err != nil { + return -1, err + } + copy(p, data) + return readable, nil +} + +// Write writes data from the byteBufferWrapper's internal buffer into p. +func (bw byteBufferWrapper) Write(p []byte) (n int, err error) { + data, err := bw.b.Malloc(len(p)) + if err != nil { + return -1, err + } + copy(data, p) + return len(data), nil +} diff --git a/apache_adaptor/struct_codec.go b/apache_adaptor/struct_codec.go new file mode 100644 index 0000000..adb5c7c --- /dev/null +++ b/apache_adaptor/struct_codec.go @@ -0,0 +1,93 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apache_adaptor + +import ( + "fmt" + "reflect" + + "github.com/cloudwego/gopkg/protocol/thrift" +) + +type fastReader interface { + FastRead(buf []byte) (int, error) +} + +const OldFastWriteMethod = "FastWriteNocopy" + +func toFastCodec(p interface{}) (thrift.FastCodec, error) { + // if struct is from kitex_gen which is generated higher than v0.10.0,just assert gopkg thrift.FastCodec + if fast, ok := p.(thrift.FastCodec); ok { + return fast, nil + } + // if struct is lower than v0.10.0,the second argument 'bw' from FastWriterNocopy is from kitex package + // it's not good to import an old kitex dependency, so we have to use reflection + fast, ok := p.(interface { + BLength() int + FastRead(buf []byte) (int, error) + }) + if !ok { + return nil, fmt.Errorf("no BLength method for struct") + } + + method := reflect.ValueOf(p).MethodByName(OldFastWriteMethod) + + if !method.IsValid() { + return nil, fmt.Errorf("method not found or not exported: %s", OldFastWriteMethod) + } + + if method.Type().NumIn() != 2 { + return nil, fmt.Errorf("args num is not ok") + } + + if method.Type().NumOut() != 1 { + return nil, fmt.Errorf("resp num is not ok") + } + + if method.Type().Out(0) != reflect.TypeOf(0) { + return nil, fmt.Errorf("return type is not int") + } + + if method.Type().In(0) != reflect.TypeOf([]byte{}) { + return nil, fmt.Errorf("input type 1st is not []byte") + } + + return &oldFastCodec{ + p: fast, + method: method, + }, nil +} + +type oldFastCodec struct { + p interface { + BLength() int + FastRead(buf []byte) (int, error) + } + method reflect.Value +} + +func (c *oldFastCodec) BLength() int { + return c.p.BLength() +} + +func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { + method := c.method + out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()}) + return out[0].Interface().(int) +} + +func (c *oldFastCodec) FastRead(buf []byte) (int, error) { + return c.p.FastRead(buf) +} From 5fbaf2ffe86276b886450fda00f5d3d3241f1aa6 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 27 Nov 2024 14:55:30 +0800 Subject: [PATCH 06/12] refactor: move dir --- {apache_adaptor => protocol/thrift/apache/adaptor}/adaptor.go | 2 +- .../thrift/apache/adaptor}/byte_buffer.go | 2 +- .../thrift/apache/adaptor}/struct_codec.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename {apache_adaptor => protocol/thrift/apache/adaptor}/adaptor.go (99%) rename {apache_adaptor => protocol/thrift/apache/adaptor}/byte_buffer.go (98%) rename {apache_adaptor => protocol/thrift/apache/adaptor}/struct_codec.go (99%) diff --git a/apache_adaptor/adaptor.go b/protocol/thrift/apache/adaptor/adaptor.go similarity index 99% rename from apache_adaptor/adaptor.go rename to protocol/thrift/apache/adaptor/adaptor.go index d598e17..2f52aed 100644 --- a/apache_adaptor/adaptor.go +++ b/protocol/thrift/apache/adaptor/adaptor.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package apache_adaptor +package adaptor import ( "fmt" diff --git a/apache_adaptor/byte_buffer.go b/protocol/thrift/apache/adaptor/byte_buffer.go similarity index 98% rename from apache_adaptor/byte_buffer.go rename to protocol/thrift/apache/adaptor/byte_buffer.go index 4b8ef14..2107144 100644 --- a/apache_adaptor/byte_buffer.go +++ b/protocol/thrift/apache/adaptor/byte_buffer.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package apache_adaptor +package adaptor import "io" diff --git a/apache_adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go similarity index 99% rename from apache_adaptor/struct_codec.go rename to protocol/thrift/apache/adaptor/struct_codec.go index adb5c7c..bfa140b 100644 --- a/apache_adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package apache_adaptor +package adaptor import ( "fmt" From 4c8bcddfc48a90ac0ccabe3face59b9a37638f0c Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Thu, 28 Nov 2024 19:44:54 +0800 Subject: [PATCH 07/12] fix: fix buf reader --- protocol/thrift/apache/adaptor/adaptor.go | 24 +++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/protocol/thrift/apache/adaptor/adaptor.go b/protocol/thrift/apache/adaptor/adaptor.go index 2f52aed..ae02315 100644 --- a/protocol/thrift/apache/adaptor/adaptor.go +++ b/protocol/thrift/apache/adaptor/adaptor.go @@ -33,6 +33,7 @@ func AdaptRead(p, iprot interface{}) error { return fmt.Errorf("no codec implementation available") } + var rd io.Reader var br bufiox.Reader // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { @@ -58,10 +59,10 @@ func AdaptRead(p, iprot interface{}) error { case byteBuffer: // if reader is from byteBuffer, Read() function is not always available // so use an adaptor to implement Read() by Next() and ReadableLen() - br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r)) + rd = byteBuffer2ReadWriter(r) case io.ReadWriter: - // if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol - br = bufiox.NewDefaultReader(r) + // if reader is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol + rd = r default: return fmt.Errorf("reader not ok") } @@ -69,18 +70,29 @@ func AdaptRead(p, iprot interface{}) error { } } } - if br == nil { + if rd == nil && br == nil { return fmt.Errorf("no available field for reader") } - // read data from iprot - buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT) + var sd *thrift.SkipDecoder + if br != nil { + sd = thrift.NewSkipDecoder(br) + } else { + // if there's no bufiox.Reader, do not wrap a new bufiox.Reader, or some data will remain in the buffer + // directly read from io.Reader + sd = thrift.NewSkipDecoderWithIOReader(rd) + } + + buf, err := sd.Next(thrift.STRUCT) if err != nil { return err } + sd.Release() + // unmarshal the data into struct _, err = fastStruct.FastRead(buf) + return err } From 4a1f0844282a1bccd10dde2e5c850866483c0d65 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Tue, 3 Dec 2024 19:11:26 +0800 Subject: [PATCH 08/12] fix: rebase skip decoder and add unit test --- protocol/thrift/apache/adaptor/adaptor.go | 28 +- .../apache/adaptor/adaptor_apache_test.go | 72 +++++ .../apache/adaptor/adaptor_kitex_test.go | 277 ++++++++++++++++++ .../thrift/apache/adaptor/struct_codec.go | 5 +- 4 files changed, 367 insertions(+), 15 deletions(-) create mode 100644 protocol/thrift/apache/adaptor/adaptor_apache_test.go create mode 100644 protocol/thrift/apache/adaptor/adaptor_kitex_test.go diff --git a/protocol/thrift/apache/adaptor/adaptor.go b/protocol/thrift/apache/adaptor/adaptor.go index ae02315..0c6ac49 100644 --- a/protocol/thrift/apache/adaptor/adaptor.go +++ b/protocol/thrift/apache/adaptor/adaptor.go @@ -25,7 +25,7 @@ import ( ) // AdaptRead receive a kitex binary protocol and read it by given function. -func AdaptRead(p, iprot interface{}) error { +func AdaptRead(p, iprot interface{}) (err error) { // for now, we use fastCodec to adapt apache codec. // the struct should have the function 'FastRead' fastStruct, ok := p.(fastReader) @@ -70,26 +70,24 @@ func AdaptRead(p, iprot interface{}) error { } } } - if rd == nil && br == nil { - return fmt.Errorf("no available field for reader") - } - var sd *thrift.SkipDecoder + var buf []byte if br != nil { - sd = thrift.NewSkipDecoder(br) + sd := thrift.NewSkipDecoder(br) + buf, err = sd.Next(thrift.STRUCT) + sd.Release() + } else if rd != nil { + sd := thrift.NewReaderSkipDecoder(rd) + buf, err = sd.Next(thrift.STRUCT) + sd.Release() } else { - // if there's no bufiox.Reader, do not wrap a new bufiox.Reader, or some data will remain in the buffer - // directly read from io.Reader - sd = thrift.NewSkipDecoderWithIOReader(rd) + return fmt.Errorf("no available field for reader") } - buf, err := sd.Next(thrift.STRUCT) if err != nil { return err } - sd.Release() - // unmarshal the data into struct _, err = fastStruct.FastRead(buf) @@ -148,7 +146,11 @@ func AdaptWrite(p, oprot interface{}) error { // use fast codec buf := make([]byte, fastStruct.BLength()) - buf = buf[:fastStruct.FastWriteNocopy(buf, nil)] + n := fastStruct.FastWriteNocopy(buf, nil) + if n < 0 { + return fmt.Errorf("failed to fast write") + } + buf = buf[:n] _, err = bw.WriteBinary(buf) if err != nil { return err diff --git a/protocol/thrift/apache/adaptor/adaptor_apache_test.go b/protocol/thrift/apache/adaptor/adaptor_apache_test.go new file mode 100644 index 0000000..1eb30f3 --- /dev/null +++ b/protocol/thrift/apache/adaptor/adaptor_apache_test.go @@ -0,0 +1,72 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adaptor_test + +import ( + "bytes" + "io" + "testing" +) + +// TestApacheProtocol +func TestApacheProtocol(t *testing.T) { + // Apache Protocol implement the interface TProtocol (https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/protocol.go#L33) + // The implementation classes of this interface include many protocols. + // Such as TBinaryProtocol, TCompactProtocol, THeaderProtocol, and TJsonProtocol. + // However, for users of cloudwego/kitex, only TBinaryProtocol and TCompactProtocol are used + // So we only support to adapt these two protocols. + + // apache binary protocol with old kitex struct + testAdaptor(t, oldKitexGen, mockTBinaryProtocol()) + + // apache binary protocol with new kitex struct + testAdaptor(t, newKitexGen, mockTBinaryProtocol()) + + // apache compact protocol with old kitex struct + testAdaptor(t, oldKitexGen, mockTCompactProtocol()) + + // apache compact protocol with new kitex struct + testAdaptor(t, newKitexGen, mockTCompactProtocol()) +} + +// tBinaryProtocol +// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33 +type tBinaryProtocol struct { + trans tRichTransport +} + +func mockTBinaryProtocol() *tBinaryProtocol { + return &tBinaryProtocol{ + trans: bytes.NewBuffer(nil), + } +} + +// tCompactProtocol +// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/compact_protocol.go#L88 +type tCompactProtocol struct { + trans tRichTransport +} + +func mockTCompactProtocol() *tCompactProtocol { + return &tCompactProtocol{ + trans: bytes.NewBuffer(nil), + } +} + +// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/trans.go +type tRichTransport interface { + io.ReadWriter + // another interfaces are not used in apache adaptor, just ignore them. +} diff --git a/protocol/thrift/apache/adaptor/adaptor_kitex_test.go b/protocol/thrift/apache/adaptor/adaptor_kitex_test.go new file mode 100644 index 0000000..56b3ce7 --- /dev/null +++ b/protocol/thrift/apache/adaptor/adaptor_kitex_test.go @@ -0,0 +1,277 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package adaptor_test + +import ( + "bytes" + "errors" + "fmt" + "reflect" + "testing" + + "github.com/cloudwego/gopkg/protocol/thrift/apache/adaptor" + "github.com/stretchr/testify/require" + + "github.com/cloudwego/gopkg/bufiox" + "github.com/cloudwego/gopkg/protocol/thrift" +) + +type kitexGen int + +const ( + // newKitexGen means the kitex struct generate by tool v0.11.0+, the second argument of FastWriteNoCopy is a NocopyWriter interface. + newKitexGen kitexGen = iota + // oldKitexGen means the kitex struct generate before tool v0.11.0, the second argument of FastWriteNoCopy is a Kitex NocopyWriter type. + oldKitexGen +) + +// TestKitexBinaryProtocol +func TestKitexBinaryProtocol(t *testing.T) { + // In different versions of kitex, there are three scenarios for the implementation of the binary protocol: + // In versions before v0.11.0, the binary protocol processes data using remote.ByteBuffer. + // In versions after v0.11.0, the binary protocol processes data using bufiox. + // In versions after v0.13.0 (to be released), the binary protocol is similar to v0.11.0, also processing data using bufiox, but it provides the GetBufiox interface for easier access to bufiox. + // We will test these three scenarios separately. + + // kitex before v0.11.0 binary protocol with kitex struct before v0.11.0 + testAdaptor(t, oldKitexGen, mockBinaryProtocolV0100()) + + // kitex v0.11.0 binary protocol with kitex struct after v0.11.0 + testAdaptor(t, oldKitexGen, mockBinaryProtocolV0110()) + + // kitex v0.13.0 binary protocol with kitex struct after v0.11.0 + testAdaptor(t, oldKitexGen, mockBinaryProtocolV0130()) + + // kitex before v0.11.0 binary protocol with kitex struct after v0.11.0 + testAdaptor(t, newKitexGen, mockBinaryProtocolV0100()) + + // kitex v0.11.0 binary protocol with kitex struct after v0.11.0 + testAdaptor(t, newKitexGen, mockBinaryProtocolV0110()) + + // kitex v0.13.0 binary protocol with kitex struct after v0.11.0 + testAdaptor(t, newKitexGen, mockBinaryProtocolV0130()) +} + +// testAdaptor is used to simulate the process of the Apache adaptor obtaining a struct and a binary protocol, bridging the data serialization process. +// In this test, first create a specific struct, test converting it into a binary stream using the adaptor. +// Then deserialize it using the adaptor into a new struct, and compare the contents of the new and old structs to verify the correctness of the Apache adaptor. +func testAdaptor(t *testing.T, kitexStruct kitexGen, bp interface{}) { + var from, to interface{} + switch kitexStruct { + case oldKitexGen: + from = mockOldKitexStruct() + to = &oldKitexStruct{} + case newKitexGen: + from = mockNewKitexStruct() + to = &newKitexStruct{} + default: + require.Error(t, fmt.Errorf("kitex gen type not ok")) + } + err := adaptor.AdaptWrite(from, bp) + require.NoError(t, err) + err = adaptor.AdaptRead(to, bp) + require.NoError(t, err) + require.True(t, reflect.DeepEqual(from, to)) +} + +// binaryProtocolV0100 mocks the kitex thrift binary protocol struct before v0.11.0 (v0.11.0 is not included), with remote.ByteBuffer as the field 'trans' to handle the data. +// https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/protocol/bthrift/apache/binary_protocol.go#L44 +type binaryProtocolV0100 struct { + trans mockRemoteByteBuffer +} + +// mockRemoteByteBuffer mocks the kitex remote.ByteBuffer, which is the core abstraction of buffer in Kitex. +// https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/bytebuf.go#L46 +type mockRemoteByteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) + // Malloc n bytes sequentially in the writer buffer. + Malloc(n int) (buf []byte, err error) + // another function is not used in apache adaptor +} + +func mockBinaryProtocolV0100() *binaryProtocolV0100 { + return &binaryProtocolV0100{trans: &simpleBuffer{ + data: make([]byte, 100), + }} +} + +// binaryProtocolV0110 mocks the kitex thrift binary protocol struct after v0.11.0, with bufiox reader and writer to handle the data. +// https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 +type binaryProtocolV0110 struct { + r *thrift.BufferReader + w *thrift.BufferWriter + + br bufiox.Reader + bw bufiox.Writer +} + +func mockBinaryProtocolV0110() *binaryProtocolV0110 { + buffer := bytes.NewBuffer(nil) + br := bufiox.NewDefaultReader(buffer) + bw := bufiox.NewDefaultWriter(buffer) + return &binaryProtocolV0110{ + r: thrift.NewBufferReader(br), + w: thrift.NewBufferWriter(bw), + br: br, + bw: bw, + } +} + +// binaryProtocolV0130 mocks the kitex thrift binary protocol struct after v0.13.0 (currently unreleased) +// It's almost the same with binaryProtocolV0110, but have two more function 'GetBufioxReader' and 'GetBufioxWriter', in order to get the bufiox more convenient without reflection. +// https://github.com/cloudwego/kitex/blob/v0.13.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 +type binaryProtocolV0130 struct { + r *thrift.BufferReader + w *thrift.BufferWriter + + br bufiox.Reader + bw bufiox.Writer +} + +func mockBinaryProtocolV0130() *binaryProtocolV0130 { + buffer := bytes.NewBuffer(nil) + br := bufiox.NewDefaultReader(buffer) + bw := bufiox.NewDefaultWriter(buffer) + return &binaryProtocolV0130{ + r: thrift.NewBufferReader(br), + w: thrift.NewBufferWriter(bw), + br: br, + bw: bw, + } +} + +func (bp *binaryProtocolV0130) GetBufioxReader() bufiox.Reader { + return bp.br +} + +func (bp *binaryProtocolV0130) GetBufioxWriter() bufiox.Writer { + return bp.bw +} + +// simpleBuffer is a very simple implementation of mockRemoteByteBuffer +type simpleBuffer struct { + data []byte + rc int + wc int +} + +func (bb *simpleBuffer) Next(n int) ([]byte, error) { + if bb.rc+n > len(bb.data) { + return nil, errors.New("not enough data to read") + } + p := bb.data[bb.rc : bb.rc+n] + bb.rc += n + return p, nil +} + +func (bb *simpleBuffer) ReadableLen() int { + return len(bb.data) - bb.rc +} + +func (bb *simpleBuffer) Malloc(n int) ([]byte, error) { + if bb.wc+n > len(bb.data) { + return nil, errors.New("not enough space to allocate") + } + buf := bb.data[bb.wc : bb.wc+n] + bb.wc += n + return buf, nil +} + +// oldKitexStruct mocks the kitex struct generate before tool v0.11.0, the second argument of FastWriteNoCopy is a Kitex NocopyWriter type. +type oldKitexStruct struct { + FBool bool `thrift:"FBool,1,required" frugal:"1,required,bool" json:"FBool"` + FByte int8 `thrift:"FByte,2" frugal:"2,default,byte" json:"FByte"` + I8 int8 `thrift:"I8,3" frugal:"3,default,i8" json:"I8"` + I16 int16 `thrift:"I16,4" frugal:"4,default,i16" json:"I16"` +} + +func (p *oldKitexStruct) BLength() int { + return len(mockBinary) +} + +func (p *oldKitexStruct) FastRead(buf []byte) (int, error) { + if bytes.Equal(buf, mockBinary) { + p.FBool = true + p.FByte = 3 + p.I8 = 1 + p.I16 = 2 + return len(buf), nil + } + return -1, fmt.Errorf("data error") +} + +// binaryWriter does not implement nocopy writer +func (p *oldKitexStruct) FastWriteNocopy(buf []byte, binaryWriter interface{}) int { + if reflect.DeepEqual(p, mockOldKitexStruct()) { + copy(buf, mockBinary) + return len(buf) + } + return -1 +} + +var mockBinary = []byte{2, 0, 1, 1, 3, 0, 2, 3, 3, 0, 3, 1, 6, 0, 4, 0, 2, 0} + +func mockOldKitexStruct() *oldKitexStruct { + return &oldKitexStruct{ + FBool: true, + FByte: 3, + I8: 1, + I16: 2, + } +} + +// newKitexStruct mocks the kitex struct generate by tool v0.11.0+, the second argument of FastWriteNoCopy is a NocopyWriter interface. +type newKitexStruct struct { + FBool bool `thrift:"FBool,1,required" frugal:"1,required,bool" json:"FBool"` + FByte int8 `thrift:"FByte,2" frugal:"2,default,byte" json:"FByte"` + I8 int8 `thrift:"I8,3" frugal:"3,default,i8" json:"I8"` + I16 int16 `thrift:"I16,4" frugal:"4,default,i16" json:"I16"` +} + +func (p *newKitexStruct) BLength() int { + return len(mockBinary) +} + +func (p *newKitexStruct) FastRead(buf []byte) (int, error) { + if bytes.Equal(buf, mockBinary) { + p.FBool = true + p.FByte = 3 + p.I8 = 1 + p.I16 = 2 + return len(buf), nil + } + return -1, fmt.Errorf("data error") +} + +func (p *newKitexStruct) FastWriteNocopy(buf []byte, binaryWriter thrift.NocopyWriter) int { + if reflect.DeepEqual(p, mockNewKitexStruct()) { + copy(buf, mockBinary) + return len(buf) + } + return -1 +} + +func mockNewKitexStruct() *newKitexStruct { + return &newKitexStruct{ + FBool: true, + FByte: 3, + I8: 1, + I16: 2, + } +} diff --git a/protocol/thrift/apache/adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go index bfa140b..50076d5 100644 --- a/protocol/thrift/apache/adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -57,11 +57,11 @@ func toFastCodec(p interface{}) (thrift.FastCodec, error) { } if method.Type().Out(0) != reflect.TypeOf(0) { - return nil, fmt.Errorf("return type is not int") + return nil, fmt.Errorf("return type should be int") } if method.Type().In(0) != reflect.TypeOf([]byte{}) { - return nil, fmt.Errorf("input type 1st is not []byte") + return nil, fmt.Errorf("the first argument should be []byte") } return &oldFastCodec{ @@ -88,6 +88,7 @@ func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { return out[0].Interface().(int) } +// FastRead actually this function is not used, just to implement the FastCodec interface func (c *oldFastCodec) FastRead(buf []byte) (int, error) { return c.p.FastRead(buf) } From 8ab98462aaa8480eafd5b5726cd26060ccf1e981 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 11 Dec 2024 17:52:33 +0800 Subject: [PATCH 09/12] fix: fix name --- protocol/thrift/apache/adaptor/struct_codec.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocol/thrift/apache/adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go index 50076d5..4d4dc48 100644 --- a/protocol/thrift/apache/adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -25,7 +25,7 @@ type fastReader interface { FastRead(buf []byte) (int, error) } -const OldFastWriteMethod = "FastWriteNocopy" +const oldFastWriteMethod = "FastWriteNocopy" func toFastCodec(p interface{}) (thrift.FastCodec, error) { // if struct is from kitex_gen which is generated higher than v0.10.0,just assert gopkg thrift.FastCodec @@ -42,10 +42,10 @@ func toFastCodec(p interface{}) (thrift.FastCodec, error) { return nil, fmt.Errorf("no BLength method for struct") } - method := reflect.ValueOf(p).MethodByName(OldFastWriteMethod) + method := reflect.ValueOf(p).MethodByName(oldFastWriteMethod) if !method.IsValid() { - return nil, fmt.Errorf("method not found or not exported: %s", OldFastWriteMethod) + return nil, fmt.Errorf("method not found or not exported: %s", oldFastWriteMethod) } if method.Type().NumIn() != 2 { From 71fa60fc5d60b2580de9c70e9f5df4ec08584974 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 11 Dec 2024 17:56:37 +0800 Subject: [PATCH 10/12] fix: fix by comment --- protocol/thrift/apache/adaptor/struct_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/thrift/apache/adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go index 4d4dc48..aff68be 100644 --- a/protocol/thrift/apache/adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -84,7 +84,7 @@ func (c *oldFastCodec) BLength() int { func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { method := c.method - out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()}) + out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.NewAt(method.Type().In(1), nil)}) return out[0].Interface().(int) } From 561146c38cd7a376ad3664196af6d001f66c6533 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 11 Dec 2024 18:40:19 +0800 Subject: [PATCH 11/12] revert: revert reflect NewAt --- protocol/thrift/apache/adaptor/adaptor_kitex_test.go | 6 +++++- protocol/thrift/apache/adaptor/struct_codec.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/protocol/thrift/apache/adaptor/adaptor_kitex_test.go b/protocol/thrift/apache/adaptor/adaptor_kitex_test.go index 56b3ce7..45ae4cc 100644 --- a/protocol/thrift/apache/adaptor/adaptor_kitex_test.go +++ b/protocol/thrift/apache/adaptor/adaptor_kitex_test.go @@ -217,7 +217,7 @@ func (p *oldKitexStruct) FastRead(buf []byte) (int, error) { } // binaryWriter does not implement nocopy writer -func (p *oldKitexStruct) FastWriteNocopy(buf []byte, binaryWriter interface{}) int { +func (p *oldKitexStruct) FastWriteNocopy(buf []byte, binaryWriter oldBinaryWriter) int { if reflect.DeepEqual(p, mockOldKitexStruct()) { copy(buf, mockBinary) return len(buf) @@ -225,6 +225,10 @@ func (p *oldKitexStruct) FastWriteNocopy(buf []byte, binaryWriter interface{}) i return -1 } +type oldBinaryWriter interface { + WriteDirect(b []byte, remainCap int) error +} + var mockBinary = []byte{2, 0, 1, 1, 3, 0, 2, 3, 3, 0, 3, 1, 6, 0, 4, 0, 2, 0} func mockOldKitexStruct() *oldKitexStruct { diff --git a/protocol/thrift/apache/adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go index aff68be..4d4dc48 100644 --- a/protocol/thrift/apache/adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -84,7 +84,7 @@ func (c *oldFastCodec) BLength() int { func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { method := c.method - out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.NewAt(method.Type().In(1), nil)}) + out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()}) return out[0].Interface().(int) } From 35a5f8d7bb8ed3e9907bdea12e9c50776b77baf7 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Wed, 11 Dec 2024 18:43:37 +0800 Subject: [PATCH 12/12] fix: fix reflect again --- protocol/thrift/apache/adaptor/struct_codec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/thrift/apache/adaptor/struct_codec.go b/protocol/thrift/apache/adaptor/struct_codec.go index 4d4dc48..5f29251 100644 --- a/protocol/thrift/apache/adaptor/struct_codec.go +++ b/protocol/thrift/apache/adaptor/struct_codec.go @@ -84,7 +84,7 @@ func (c *oldFastCodec) BLength() int { func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { method := c.method - out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()}) + out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.Zero(method.Type().In(1))}) return out[0].Interface().(int) }