Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Commit 856b3b1

Browse files
committed
plumbing: packfile/scanner, readability/performance improvements, zlib pooling
Signed-off-by: Arran Walker <arran.walker@fiveturns.org>
1 parent e5268e9 commit 856b3b1

File tree

2 files changed

+91
-106
lines changed

2 files changed

+91
-106
lines changed

plumbing/format/packfile/common.go

+10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package packfile
22

33
import (
44
"bytes"
5+
"compress/zlib"
56
"io"
67
"sync"
78

@@ -66,3 +67,12 @@ var bufPool = sync.Pool{
6667
return bytes.NewBuffer(nil)
6768
},
6869
}
70+
71+
var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}
72+
73+
var zlibReaderPool = sync.Pool{
74+
New: func() interface{} {
75+
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
76+
return r
77+
},
78+
}

plumbing/format/packfile/scanner.go

+81-106
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ type ObjectHeader struct {
3939
}
4040

4141
type Scanner struct {
42-
r reader
43-
zr readerResetter
42+
r *scannerReader
4443
crc hash.Hash32
4544

4645
// pendingObject is used to detect if an object has been read, or still
@@ -56,19 +55,27 @@ type Scanner struct {
5655
// NewScanner returns a new Scanner based on a reader, if the given reader
5756
// implements io.ReadSeeker the Scanner will be also Seekable
5857
func NewScanner(r io.Reader) *Scanner {
59-
seeker, ok := r.(io.ReadSeeker)
60-
if !ok {
61-
seeker = &trackableReader{Reader: r}
62-
}
58+
_, ok := r.(io.ReadSeeker)
6359

6460
crc := crc32.NewIEEE()
6561
return &Scanner{
66-
r: newTeeReader(newByteReadSeeker(seeker), crc),
62+
r: newScannerReader(r, crc),
6763
crc: crc,
6864
IsSeekable: ok,
6965
}
7066
}
7167

68+
func (s *Scanner) Reset(r io.Reader) {
69+
_, ok := r.(io.ReadSeeker)
70+
71+
s.r.Reset(r)
72+
s.crc.Reset()
73+
s.IsSeekable = ok
74+
s.pendingObject = nil
75+
s.version = 0
76+
s.objects = 0
77+
}
78+
7279
// Header reads the whole packfile header (signature, version and object count).
7380
// It returns the version and the object count and performs checks on the
7481
// validity of the signature and the version fields.
@@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
182189
// nextObjectHeader returns the ObjectHeader for the next object in the reader
183190
// without the Offset field
184191
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
185-
defer s.Flush()
186-
192+
s.r.Flush()
187193
s.crc.Reset()
188194

189195
h := &ObjectHeader{}
@@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) {
304310
// NextObject writes the content of the next object into the reader, returns
305311
// the number of bytes written, the CRC32 of the content and an error, if any
306312
func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) {
307-
defer s.crc.Reset()
308-
309313
s.pendingObject = nil
310314
written, err = s.copyObject(w)
311-
s.Flush()
315+
316+
s.r.Flush()
312317
crc32 = s.crc.Sum32()
318+
s.crc.Reset()
319+
313320
return
314321
}
315322

316323
// ReadRegularObject reads and write a non-deltified object
317324
// from it zlib stream in an object entry in the packfile.
318325
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
319-
if s.zr == nil {
320-
var zr io.ReadCloser
321-
zr, err = zlib.NewReader(s.r)
322-
if err != nil {
323-
return 0, fmt.Errorf("zlib initialization error: %s", err)
324-
}
326+
zr := zlibReaderPool.Get().(io.ReadCloser)
327+
defer zlibReaderPool.Put(zr)
325328

326-
s.zr = zr.(readerResetter)
327-
} else {
328-
if err = s.zr.Reset(s.r, nil); err != nil {
329-
return 0, fmt.Errorf("zlib reset error: %s", err)
330-
}
329+
if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
330+
return 0, fmt.Errorf("zlib reset error: %s", err)
331331
}
332332

333-
defer ioutil.CheckClose(s.zr, &err)
333+
defer ioutil.CheckClose(zr, &err)
334334
buf := byteSlicePool.Get().([]byte)
335-
n, err = io.CopyBuffer(w, s.zr, buf)
335+
n, err = io.CopyBuffer(w, zr, buf)
336336
byteSlicePool.Put(buf)
337337
return
338338
}
@@ -378,110 +378,85 @@ func (s *Scanner) Close() error {
378378
return err
379379
}
380380

381-
// Flush finishes writing the buffer to crc hasher in case we are using
382-
// a teeReader. Otherwise it is a no-op.
381+
// Flush is a no-op (deprecated)
383382
func (s *Scanner) Flush() error {
384-
tee, ok := s.r.(*teeReader)
385-
if ok {
386-
return tee.Flush()
387-
}
388383
return nil
389384
}
390385

391-
type trackableReader struct {
392-
count int64
393-
io.Reader
386+
// scannerReader has the following characteristics:
387+
// - Provides an io.SeekReader impl for bufio.Reader, when the underlying
388+
// reader supports it.
389+
// - Keeps track of the current read position, for when the underlying reader
390+
// isn't an io.SeekReader, but we still want to know the current offset.
391+
// - Writes to the hash writer what it reads, with the aid of a smaller buffer.
392+
// The buffer helps avoid a performance penality for performing small writes
393+
// to the crc32 hash writer.
394+
type scannerReader struct {
395+
reader io.Reader
396+
crc io.Writer
397+
rbuf *bufio.Reader
398+
wbuf *bufio.Writer
399+
offset int64
394400
}
395401

396-
// Read reads up to len(p) bytes into p.
397-
func (r *trackableReader) Read(p []byte) (n int, err error) {
398-
n, err = r.Reader.Read(p)
399-
r.count += int64(n)
400-
401-
return
402-
}
403-
404-
// Seek only supports io.SeekCurrent, any other operation fails
405-
func (r *trackableReader) Seek(offset int64, whence int) (int64, error) {
406-
if whence != io.SeekCurrent {
407-
return -1, ErrSeekNotSupported
402+
func newScannerReader(r io.Reader, h io.Writer) *scannerReader {
403+
sr := &scannerReader{
404+
reader: r,
405+
crc: h,
408406
}
407+
sr.rbuf = bufio.NewReader(sr.reader)
408+
sr.wbuf = bufio.NewWriterSize(sr.crc, 64)
409409

410-
return r.count, nil
410+
return sr
411411
}
412412

413-
func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker {
414-
return &bufferedSeeker{
415-
r: r,
416-
Reader: *bufio.NewReader(r),
417-
}
413+
func (r *scannerReader) Reset(reader io.Reader) {
414+
r.reader = reader
415+
r.offset = 0
416+
r.rbuf.Reset(r.reader)
417+
r.wbuf.Reset(r.crc)
418418
}
419419

420-
type bufferedSeeker struct {
421-
r io.ReadSeeker
422-
bufio.Reader
423-
}
420+
func (r *scannerReader) Read(p []byte) (n int, err error) {
421+
n, err = r.rbuf.Read(p)
424422

425-
func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
426-
if whence == io.SeekCurrent && offset == 0 {
427-
current, err := r.r.Seek(offset, whence)
428-
if err != nil {
429-
return current, err
430-
}
431-
432-
return current - int64(r.Buffered()), nil
423+
r.offset += int64(n)
424+
if _, err := r.wbuf.Write(p[:n]); err != nil {
425+
return n, err
433426
}
434-
435-
defer r.Reader.Reset(r.r)
436-
return r.r.Seek(offset, whence)
437-
}
438-
439-
type readerResetter interface {
440-
io.ReadCloser
441-
zlib.Resetter
442-
}
443-
444-
type reader interface {
445-
io.Reader
446-
io.ByteReader
447-
io.Seeker
427+
return
448428
}
449429

450-
type teeReader struct {
451-
reader
452-
w hash.Hash32
453-
bufWriter *bufio.Writer
430+
func (r *scannerReader) ReadByte() (b byte, err error) {
431+
b, err = r.rbuf.ReadByte()
432+
if err == nil {
433+
r.offset++
434+
return b, r.wbuf.WriteByte(b)
435+
}
436+
return
454437
}
455438

456-
func newTeeReader(r reader, h hash.Hash32) *teeReader {
457-
return &teeReader{
458-
reader: r,
459-
w: h,
460-
bufWriter: bufio.NewWriter(h),
461-
}
439+
func (r *scannerReader) Flush() error {
440+
return r.wbuf.Flush()
462441
}
463442

464-
func (r *teeReader) Read(p []byte) (n int, err error) {
465-
r.Flush()
443+
// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker,
444+
// then only whence=io.SeekCurrent is supported, any other operation fails.
445+
func (r *scannerReader) Seek(offset int64, whence int) (int64, error) {
446+
var err error
466447

467-
n, err = r.reader.Read(p)
468-
if n > 0 {
469-
if n, err := r.w.Write(p[:n]); err != nil {
470-
return n, err
448+
if seeker, ok := r.reader.(io.ReadSeeker); !ok {
449+
if whence != io.SeekCurrent && offset != 0 {
450+
return -1, ErrSeekNotSupported
451+
}
452+
} else {
453+
if whence == io.SeekCurrent || offset == 0 {
454+
return r.offset, nil
471455
}
472-
}
473-
return
474-
}
475456

476-
func (r *teeReader) ReadByte() (b byte, err error) {
477-
b, err = r.reader.ReadByte()
478-
if err == nil {
479-
return b, r.bufWriter.WriteByte(b)
457+
r.offset, err = seeker.Seek(offset, whence)
458+
r.rbuf.Reset(r.reader)
480459
}
481460

482-
return
483-
}
484-
485-
func (r *teeReader) Flush() (err error) {
486-
return r.bufWriter.Flush()
461+
return r.offset, err
487462
}

0 commit comments

Comments
 (0)