Skip to content

Commit

Permalink
Make LFS http_client parallel within a batch.
Browse files Browse the repository at this point in the history
Signed-off-by: Royce Remer <royceremer@gmail.com>
  • Loading branch information
rremer authored and Royce Remer committed Oct 31, 2024
1 parent 0690cb0 commit ef84dfa
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 67 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ require (
golang.org/x/image v0.21.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/tools v0.26.0
Expand Down Expand Up @@ -316,7 +317,6 @@ require (
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241021214115-324edc3d5d38 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
Expand Down
126 changes: 75 additions & 51 deletions modules/lfs/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"
"code.gitea.io/gitea/modules/setting"

"golang.org/x/sync/errgroup"
)

// HTTPClient is used to communicate with the LFS server
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback)
}

// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
Expand All @@ -133,71 +136,92 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}

errGroup, groupCtx := errgroup.WithContext(ctx)
for _, object := range result.Objects {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}
func(groupCtx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) {
errGroup.Go(func() error {
err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
return err
})
}(groupCtx, object, dc, uc, transferAdapter)
}

if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
}
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}

link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
// the response from an lfs batch api request for this specific object id contained an error
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)

content, err := uc(object.Pointer, nil)
if err != nil {
// this was an 'upload' request inside the batch request
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
}

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
err := dc(object.Pointer, nil, object.Error)
if errors.Is(object.Error, ErrObjectNotExist) {
log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err)
return nil
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
// this was a 'download' request which was a legitimate error response from the batch api (not an http/404)
return err
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}

link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil
}

Expand Down
15 changes: 5 additions & 10 deletions modules/lfs/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,36 +211,31 @@ func TestHTTPClientDownload(t *testing.T) {
expectederror: "TransferAdapter not found: ",
},
// case 5
{
endpoint: "https://error-in-response-objects.io",
expectederror: "Object not found",
},
// case 6
{
endpoint: "https://empty-actions-map.io",
expectederror: "missing action 'download'",
},
// case 7
// case 6
{
endpoint: "https://download-actions-map.io",
expectederror: "",
},
// case 8
// case 7
{
endpoint: "https://upload-actions-map.io",
expectederror: "missing action 'download'",
},
// case 9
// case 8
{
endpoint: "https://verify-actions-map.io",
expectederror: "missing action 'download'",
},
// case 10
// case 9
{
endpoint: "https://unknown-actions-map.io",
expectederror: "missing action 'download'",
},
// case 11
// case 10
{
endpoint: "https://legacy-batch-request-download.io",
expectederror: "",
Expand Down
5 changes: 0 additions & 5 deletions modules/repository/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package repository

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
downloadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
if objectError != nil {
if errors.Is(objectError, lfs.ErrObjectNotExist) {
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
return nil
}
return objectError
}

Expand Down

0 comments on commit ef84dfa

Please # to comment.