Skip to content

Commit

Permalink
Merge pull request #131 from swordqiu/feature/qj-streampipe-uncompres…
Browse files Browse the repository at this point in the history
…s-xz

feature: streampipe support uncompress xz
  • Loading branch information
swordqiu authored May 30, 2024
2 parents 0d65278 + c73f724 commit cc85999
Show file tree
Hide file tree
Showing 54 changed files with 7,725 additions and 1 deletion.
1 change: 1 addition & 0 deletions errors/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (

ErrNotFound = Error("NotFoundError")
ErrNotEmpty = Error("NotEmptyError")
ErrEmpty = Error("EmptyError")
ErrDuplicateId = Error("DuplicateIdError")
ErrInvalidStatus = Error("InvalidStatusError")
ErrNotImplemented = Error("NotImplementedError")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/tredoe/osutil/v2 v2.0.0-rc.16
github.com/ulikunitz/xz v0.5.12
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
golang.org/x/text v0.3.6
moul.io/http2curl/v2 v2.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ github.com/tredoe/osutil/v2 v2.0.0-rc.16/go.mod h1:uLRVx/3pb7Y4RQhG8cQFbPE9ha5r8
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc=
github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
66 changes: 65 additions & 1 deletion util/streamutils/streamutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,85 @@ import (
"fmt"
"hash"
"io"

"github.com/ulikunitz/xz"

"yunion.io/x/pkg/errors"
)

type SStreamProperty struct {
CheckSum string
Size int64
}

func StreamPipe(reader io.Reader, writer io.Writer, CalChecksum bool, callback func(saved int64)) (*SStreamProperty, error) {
type sXZReadAheadReader struct {
offset int64
header []byte
upstream io.Reader
}

func newXZReadAheadReader(stream io.Reader) (*sXZReadAheadReader, error) {
xzHdr := make([]byte, xz.HeaderLen)
n, err := stream.Read(xzHdr)
if err != nil {
return nil, errors.Wrap(err, "Read XZ hader")
}
if n != len(xzHdr) {
return nil, errors.Wrap(errors.ErrEOF, "too few header bytes")
}
return &sXZReadAheadReader{
offset: 0,
header: xzHdr,
upstream: stream,
}, nil
}

func (s *sXZReadAheadReader) IsXz() bool {
return xz.ValidHeader(s.header)
}

func (s *sXZReadAheadReader) Read(buf []byte) (int, error) {
if s.offset < int64(len(s.header)) {
// read from header
rdSize := len(s.header) - int(s.offset)
if rdSize > len(buf) {
rdSize = len(buf)
}
n := copy(buf, s.header[s.offset:s.offset+int64(rdSize)])
s.offset += int64(n)
return n, nil
} else {
n, err := s.upstream.Read(buf)
s.offset += int64(n)
return n, err
}
}

func StreamPipe(upstream io.Reader, writer io.Writer, CalChecksum bool, callback func(saved int64)) (*SStreamProperty, error) {
sp := SStreamProperty{}

var md5sum hash.Hash
if CalChecksum {
md5sum = md5.New()
}

aheadReader, err := newXZReadAheadReader(upstream)
if err != nil {
return nil, errors.Wrap(err, "ReadAheadReader")
}

var reader io.Reader

if aheadReader.IsXz() {
xzReader, err := xz.NewReader(aheadReader)
if err != nil {
return nil, errors.Wrap(err, "xz.NewReader")
}
reader = xzReader
} else {
reader = aheadReader
}

buf := make([]byte, 4096)
for {
n, err := reader.Read(buf)
Expand Down
105 changes: 105 additions & 0 deletions util/streamutils/streamutils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package streamutils

import (
"bytes"
"io"
"math/rand"
"reflect"
"testing"

"github.com/ulikunitz/xz"
)

func TestStreamPipe(t *testing.T) {
for _, bufSize := range []int{
324553,
2312,
} {
seed := rand.New(rand.NewSource(int64(bufSize)))
buf := make([]byte, bufSize)
n, err := seed.Read(buf)
if err != nil {
t.Errorf("rand seed read fail %s", err)
continue
}
t.Logf("rand read %d", n)

inBuf := bytes.NewReader(buf[:n])
outBuf := &bytes.Buffer{}
stat, err := StreamPipe(inBuf, outBuf, true, nil)
if err != nil {
t.Errorf("Steampipe fail %s", err)
} else {
t.Logf("stat %#v", stat)
if !reflect.DeepEqual(buf, outBuf.Bytes()) {
t.Errorf("input != output")
}
}
}
}

func TestStreamPipeXZ(t *testing.T) {
for _, bufSize := range []int{
324553,
2312,
} {
seed := rand.New(rand.NewSource(int64(bufSize)))
buf := make([]byte, bufSize)
n, err := seed.Read(buf)
if err != nil {
t.Errorf("rand seed read fail %s", err)
continue
}
t.Logf("rand read %d", n)

xzBuf := &bytes.Buffer{}

w, err := xz.NewWriter(xzBuf)
if err != nil {
t.Errorf("xz NewWriter fail %s", err)
continue
}

n, err = io.WriteString(w, string(buf))
if err != nil {
t.Errorf("xz write fail %s", err)
continue
}
t.Logf("xz to compress %d", n)

err = w.Close()
if err != nil {
t.Errorf("xz write close fail %s", err)
continue
}

xzBytes := xzBuf.Bytes()
t.Logf("compressed %d", len(xzBytes))

inBuf := bytes.NewReader(xzBytes)
outBuf := &bytes.Buffer{}
stat, err := StreamPipe(inBuf, outBuf, true, nil)
if err != nil {
t.Errorf("Steampipe fail %s", err)
} else {
t.Logf("stat %#v", stat)
if !reflect.DeepEqual(buf, outBuf.Bytes()) {
t.Errorf("input != output")
}
}
}
}
28 changes: 28 additions & 0 deletions vendor/github.com/ulikunitz/xz/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions vendor/github.com/ulikunitz/xz/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 88 additions & 0 deletions vendor/github.com/ulikunitz/xz/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions vendor/github.com/ulikunitz/xz/SECURITY.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cc85999

Please # to comment.