diff --git a/modules/lfs/http_client.go b/modules/lfs/http_client.go index 4859fe61e1db9..18d348a4da52f 100644 --- a/modules/lfs/http_client.go +++ b/modules/lfs/http_client.go @@ -13,6 +13,8 @@ import ( "net/url" "strings" + "golang.org/x/sync/errgroup" + "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/proxy" @@ -114,6 +116,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl return c.performOperation(ctx, objects, nil, callback) } +// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error { if len(objects) == 0 { return nil @@ -134,71 +137,86 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc return fmt.Errorf("TransferAdapter not found: %s", result.Transfer) } + errGroup, groupCtx := errgroup.WithContext(context.Background()) for _, object := range result.Objects { - if object.Error != nil { - log.Trace("Error on object %v: %v", object.Pointer, object.Error) - if uc != nil { - if _, err := uc(object.Pointer, object.Error); err != nil { - return err - } - } else { - if err := dc(object.Pointer, nil, object.Error); err != nil { - return err - } - } - continue - } - - if uc != nil { - if len(object.Actions) == 0 { - log.Trace("%v already present on server", object.Pointer) - continue - } + func(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) { + errGroup.Go(func() error { + err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter) + return err + }) + }(groupCtx, object, dc, uc, transferAdapter) + } - link, ok := object.Actions["upload"] - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'upload'") - } + // only the first error is returned, preserving legacy behavior before concurrency + return errGroup.Wait() +} - content, err := uc(object.Pointer, nil) - if err != nil { +// performSingleOperation performs an LFS upload or download operation on a single object +func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error { + if object.Error != nil { + log.Trace("Error on object %v: %v", object.Pointer, object.Error) + if uc != nil { + if _, err := uc(object.Pointer, object.Error); err != nil { return err } - - err = transferAdapter.Upload(ctx, link, object.Pointer, content) - if err != nil { + } else { + err := dc(object.Pointer, nil, object.Error) + if errors.Is(object.Error, ErrObjectNotExist) { + log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err) + return nil + } else { return err } + } + } - link, ok = object.Actions["verify"] - if ok { - if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { - return err - } - } - } else { - link, ok := object.Actions["download"] - if !ok { - // no actions block in response, try legacy response schema - link, ok = object.Links["download"] - } - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'download'") - } + if uc != nil { + if len(object.Actions) == 0 { + log.Trace("%v already present on server", object.Pointer) + return nil + } - content, err := transferAdapter.Download(ctx, link) - if err != nil { - return err - } + link, ok := object.Actions["upload"] + if !ok { + return errors.New("missing action 'upload'") + } + + content, err := uc(object.Pointer, nil) + if err != nil { + return err + } - if err := dc(object.Pointer, content, nil); err != nil { + err = transferAdapter.Upload(ctx, link, object.Pointer, content) + if err != nil { + return err + } + + link, ok = object.Actions["verify"] + if ok { + if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { return err } } - } + } else { + link, ok := object.Actions["download"] + if !ok { + // no actions block in response, try legacy response schema + link, ok = object.Links["download"] + } + if !ok { + log.Debug("%+v", object) + return errors.New("missing action 'download'") + } + content, err := transferAdapter.Download(ctx, link) + if err != nil { + return err + } + + if err := dc(object.Pointer, content, nil); err != nil { + return err + } + } return nil } diff --git a/modules/lfs/http_client_test.go b/modules/lfs/http_client_test.go index ec90f5375d1b9..e17b38a1c9eee 100644 --- a/modules/lfs/http_client_test.go +++ b/modules/lfs/http_client_test.go @@ -211,36 +211,31 @@ func TestHTTPClientDownload(t *testing.T) { expectederror: "TransferAdapter not found: ", }, // case 5 - { - endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", - }, - // case 6 { endpoint: "https://empty-actions-map.io", expectederror: "missing action 'download'", }, - // case 7 + // case 6 { endpoint: "https://download-actions-map.io", expectederror: "", }, - // case 8 + // case 7 { endpoint: "https://upload-actions-map.io", expectederror: "missing action 'download'", }, - // case 9 + // case 8 { endpoint: "https://verify-actions-map.io", expectederror: "missing action 'download'", }, - // case 10 + // case 9 { endpoint: "https://unknown-actions-map.io", expectederror: "missing action 'download'", }, - // case 11 + // case 10 { endpoint: "https://legacy-batch-request-download.io", expectederror: "", diff --git a/modules/repository/repo.go b/modules/repository/repo.go index 3d1899b2fe006..cb926084baae4 100644 --- a/modules/repository/repo.go +++ b/modules/repository/repo.go @@ -5,7 +5,6 @@ package repository import ( "context" - "errors" "fmt" "io" "strings" @@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re downloadObjects := func(pointers []lfs.Pointer) error { err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { if objectError != nil { - if errors.Is(objectError, lfs.ErrObjectNotExist) { - log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError) - return nil - } return objectError }