Skip to content

Commit

Permalink
Make LFS http_client parallel within a batch.
Browse files Browse the repository at this point in the history
Signed-off-by: Royce Remer <royceremer@gmail.com>
  • Loading branch information
rremer authored and Royce Remer committed Oct 29, 2024
1 parent feca880 commit 82754f0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 66 deletions.
118 changes: 67 additions & 51 deletions modules/lfs/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/proxy"

"golang.org/x/sync/errgroup"
)

const httpBatchSize = 20
Expand Down Expand Up @@ -114,6 +116,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl
return c.performOperation(ctx, objects, nil, callback)
}

// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error {
if len(objects) == 0 {
return nil
Expand All @@ -134,71 +137,84 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc
return fmt.Errorf("TransferAdapter not found: %s", result.Transfer)
}

errGroup, groupCtx := errgroup.WithContext(context.Background())
for _, object := range result.Objects {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
} else {
if err := dc(object.Pointer, nil, object.Error); err != nil {
return err
}
}
continue
}
func(groupCtx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) {
errGroup.Go(func() error {
err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter)
return err
})
}(groupCtx, object, dc, uc, transferAdapter)
}

// only the first error is returned, preserving legacy behavior before concurrency
return errGroup.Wait()
}

// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error {
if object.Error != nil {
log.Trace("Error on object %v: %v", object.Pointer, object.Error)
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
continue
if _, err := uc(object.Pointer, object.Error); err != nil {
return err
}
}
err := dc(object.Pointer, nil, object.Error)
if errors.Is(object.Error, ErrObjectNotExist) {
log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err)
return nil
}
return err
}

link, ok := object.Actions["upload"]
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'upload'")
}
if uc != nil {
if len(object.Actions) == 0 {
log.Trace("%v already present on server", object.Pointer)
return nil
}

content, err := uc(object.Pointer, nil)
if err != nil {
return err
}
link, ok := object.Actions["upload"]
if !ok {
return errors.New("missing action 'upload'")
}

err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}
content, err := uc(object.Pointer, nil)
if err != nil {
return err
}

link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}
err = transferAdapter.Upload(ctx, link, object.Pointer, content)
if err != nil {
return err
}

content, err := transferAdapter.Download(ctx, link)
if err != nil {
link, ok = object.Actions["verify"]
if ok {
if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil {
return err
}
}
} else {
link, ok := object.Actions["download"]
if !ok {
// no actions block in response, try legacy response schema
link, ok = object.Links["download"]
}
if !ok {
log.Debug("%+v", object)
return errors.New("missing action 'download'")
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
content, err := transferAdapter.Download(ctx, link)
if err != nil {
return err
}
}

if err := dc(object.Pointer, content, nil); err != nil {
return err
}
}
return nil
}

Expand Down
15 changes: 5 additions & 10 deletions modules/lfs/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,36 +211,31 @@ func TestHTTPClientDownload(t *testing.T) {
expectederror: "TransferAdapter not found: ",
},
// case 5
{
endpoint: "https://error-in-response-objects.io",
expectederror: "Object not found",
},
// case 6
{
endpoint: "https://empty-actions-map.io",
expectederror: "missing action 'download'",
},
// case 7
// case 6
{
endpoint: "https://download-actions-map.io",
expectederror: "",
},
// case 8
// case 7
{
endpoint: "https://upload-actions-map.io",
expectederror: "missing action 'download'",
},
// case 9
// case 8
{
endpoint: "https://verify-actions-map.io",
expectederror: "missing action 'download'",
},
// case 10
// case 9
{
endpoint: "https://unknown-actions-map.io",
expectederror: "missing action 'download'",
},
// case 11
// case 10
{
endpoint: "https://legacy-batch-request-download.io",
expectederror: "",
Expand Down
5 changes: 0 additions & 5 deletions modules/repository/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package repository

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re
downloadObjects := func(pointers []lfs.Pointer) error {
err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error {
if objectError != nil {
if errors.Is(objectError, lfs.ErrObjectNotExist) {
log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError)
return nil
}
return objectError
}

Expand Down

0 comments on commit 82754f0

Please # to comment.