Skip to content

Tweaks #69

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module github.com/fluent/fluent-bit-go

go 1.14
go 1.20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't bump the Go version until it's necessary.


require github.com/ugorji/go/codec v1.1.7
require github.com/ugorji/go/codec v1.2.11
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
23 changes: 19 additions & 4 deletions output/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder {
}

func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) {
var check error

var m interface{}

check = dec.mpdec.Decode(&m)
if check != nil {
err := dec.mpdec.Decode(&m)
if err != nil {
return -1, 0, nil
}

Expand All @@ -82,7 +82,22 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in
return -2, 0, nil
}

t := slice.Index(0).Interface()
var t interface{}
ts = slice.Index(0).Interface()
switch ty := ts.(type) {
case FLBTime:
t = ty
case uint64:
t = ty
case []interface{}: // for Fluent Bit V2 metadata type of format
s := reflect.ValueOf(ty)
if s.Kind() != reflect.Slice || s.Len() < 2 {
return -4, 0, nil
}
t = s.Index(0).Interface()
default:
return -5, 0, nil
}
data := slice.Index(1)

map_data, ok := data.Interface().(map[interface{}]interface{})
Expand Down
118 changes: 118 additions & 0 deletions output/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
package output

import (
"encoding/binary"
"log"
"os"
"reflect"
"testing"
"time"
"unsafe"
)

const (
testDataFile = "./testdata/data"
)

// dummyRecord should be byte Array, not Slice to be able to Cast c array.
var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */
0xd7, 0x00, 0x5e, 0xa9, 0x17, 0xe0, 0x00, 0x00, 0x00, 0x00, /* 2020/04/29 06:00:00*/
Expand All @@ -33,6 +41,19 @@ var dummyRecord [29]byte = [29]byte{0x92, /* fix array 2 */
0x01, /* fix int 1 */
}

// dummyV2Record should be byte Array, not Slice to be able to Cast c array.
var dummyV2Record [39]byte = [39]byte{0xdd /* array 32 */, 0x00, 0x00, 0x00,
0x02, /* count of array elements */
0xdd /* array 32 */, 0x00, 0x00, 0x00,
0x02, /* count of array elements */
0xd7, 0x00, 0x64, 0xbe, 0x0e, 0xeb, 0x16, 0x36, 0xe1, 0x28, 0x80, /* 2023/07/24 14:40:59 */
0x82, /* fix map 2 */
0xa7, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, /* fix str 7 "compact" */
0xc3, /* true */
0xa6, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, /* fix str 6 "schema" */
0x01, /* fix int 1 */
}

func TestGetRecord(t *testing.T) {
dec := NewDecoder(unsafe.Pointer(&dummyRecord), len(dummyRecord))
if dec == nil {
Expand Down Expand Up @@ -63,3 +84,100 @@ func TestGetRecord(t *testing.T) {
t.Errorf(`record["schema"] is not 1 %d`, v)
}
}

func TestGetV2Record(t *testing.T) {

anotherDummyV2RecordBytes, err := os.ReadFile(testDataFile)
if err != nil {
log.Fatal(err)
}
anotherDummyV2Record := (*[39]byte)(anotherDummyV2RecordBytes)

t.Log(dummyV2Record)
t.Log(*anotherDummyV2Record)

dummyV2Records := [][39]byte{
dummyV2Record,
*anotherDummyV2Record,
}

for i, record := range dummyV2Records {

dec := NewDecoder(unsafe.Pointer(&record), len(record))
if dec == nil {
t.Fatal("dec is nil, i", i)
}

ret, timestamp, record := GetRecord(dec)
if ret < 0 {
t.Fatalf("ret is negative: code %v", ret)
}

// test timestamp
ts, ok := timestamp.(FLBTime)
if !ok {
t.Fatalf("cast error. Type is %s", reflect.TypeOf(timestamp))
}

// test
if ts.Time != extractTimeStamp() {
t.Fatalf("GetRecord timestamp extraction does not match manual extractTimeStamp")
}

if ts.Unix() != int64(0x64be0eeb) {
t.Errorf("ts.Unix() error. given %d", ts.Unix())
}

// test record
v, ok := record["schema"].(int64)
if !ok {
t.Fatalf("cast error. Type is %s", reflect.TypeOf(record["schema"]))
}
if v != 1 {
t.Errorf(`record["schema"] is not 1 %d`, v)
}
}
}

// extractTimeStamp extracts the time out of the MsgPack data without using refection
func extractTimeStamp() (ts time.Time) {

data := unsafe.Slice((*byte)(unsafe.Pointer(&dummyV2Record)), int(len(dummyV2Record)))

// Manually find the time.
//
// The first headers of the data coming from Fluentbit looks like this
//~/Downloads/msgpack-inspect ./data
// ---
// - format: "array32"
// header: "0xdd"
// length: 2
// children:
// - format: "array32"
// header: "0xdd"
// length: 2
// children:
// - format: "fixext8"
// header: "0xd7"
// exttype: 0
// length: 8
// data: "0x64a46baa019bfcc0"
// - format: "fixmap"
// header: "0x80"
// length: 0
// children: []
// Array32 is 5 bytes long, and the fixext8 has x2 byte header, so that's where the time field starts
// fixext8 has 8 bytes of data
// Therefore, time bits start at: 5 + 5 + 2 = 12
// Time bits are 8 long, so 12 + 8 = 20
// See also: https://github.com/msgpack/msgpack/blob/master/spec.md
//
timeEightBytes := data[12:20] // extract bytes 12 through 20
sec := binary.BigEndian.Uint32(timeEightBytes)
usec := binary.BigEndian.Uint32(timeEightBytes[4:])
ts = time.Unix(int64(sec), int64(usec))

//log.Println("timestamp:", timestamp.Format(time.RFC850)

return ts
}
Binary file added output/testdata/data
Binary file not shown.
Binary file added output/testdata/data1
Binary file not shown.
Loading