diff --git a/go.mod b/go.mod index 1005176d29b23..e5968e5baf6ee 100644 --- a/go.mod +++ b/go.mod @@ -124,6 +124,7 @@ require ( golang.org/x/image v0.18.0 golang.org/x/net v0.28.0 golang.org/x/oauth2 v0.21.0 + golang.org/x/sync v0.8.0 golang.org/x/sys v0.24.0 golang.org/x/text v0.17.0 golang.org/x/tools v0.24.0 @@ -319,7 +320,6 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.20.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240314234333-6e1732d8331c // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect diff --git a/modules/lfs/http_client.go b/modules/lfs/http_client.go index aa9e744d72b8f..3e0364192483c 100644 --- a/modules/lfs/http_client.go +++ b/modules/lfs/http_client.go @@ -17,6 +17,8 @@ import ( "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/proxy" "code.gitea.io/gitea/modules/setting" + + "golang.org/x/sync/errgroup" ) // HTTPClient is used to communicate with the LFS server @@ -113,6 +115,7 @@ func (c *HTTPClient) Upload(ctx context.Context, objects []Pointer, callback Upl return c.performOperation(ctx, objects, nil, callback) } +// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc DownloadCallback, uc UploadCallback) error { if len(objects) == 0 { return nil @@ -133,71 +136,92 @@ func (c *HTTPClient) performOperation(ctx context.Context, objects []Pointer, dc return fmt.Errorf("TransferAdapter not found: %s", result.Transfer) } + errGroup, groupCtx := errgroup.WithContext(ctx) for _, object := range result.Objects { - if object.Error != nil { - log.Trace("Error on object %v: %v", object.Pointer, object.Error) - if uc != nil { - if _, err := uc(object.Pointer, object.Error); err != nil { - return err - } - } else { - if err := dc(object.Pointer, nil, object.Error); err != nil { - return err - } - } - continue - } + func(groupCtx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) { + errGroup.Go(func() error { + err := performSingleOperation(groupCtx, object, dc, uc, transferAdapter) + return err + }) + }(groupCtx, object, dc, uc, transferAdapter) + } - if uc != nil { - if len(object.Actions) == 0 { - log.Trace("%v already present on server", object.Pointer) - continue - } + // only the first error is returned, preserving legacy behavior before concurrency + return errGroup.Wait() +} - link, ok := object.Actions["upload"] - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'upload'") - } +// performSingleOperation performs an LFS upload or download operation on a single object +func performSingleOperation(ctx context.Context, object *ObjectResponse, dc DownloadCallback, uc UploadCallback, transferAdapter TransferAdapter) error { + // the response from an lfs batch api request for this specific object id contained an error + if object.Error != nil { + log.Trace("Error on object %v: %v", object.Pointer, object.Error) - content, err := uc(object.Pointer, nil) - if err != nil { + // this was an 'upload' request inside the batch request + if uc != nil { + if _, err := uc(object.Pointer, object.Error); err != nil { return err } + } - err = transferAdapter.Upload(ctx, link, object.Pointer, content) - if err != nil { - return err - } + // this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request + err := dc(object.Pointer, nil, object.Error) + if errors.Is(object.Error, ErrObjectNotExist) { + log.Warn("Ignoring missing upstream LFS object %-v: %v", object.Pointer, err) + return nil + } - link, ok = object.Actions["verify"] - if ok { - if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { - return err - } - } - } else { - link, ok := object.Actions["download"] - if !ok { - // no actions block in response, try legacy response schema - link, ok = object.Links["download"] - } - if !ok { - log.Debug("%+v", object) - return errors.New("missing action 'download'") - } + // this was a 'download' request which was a legitimate error response from the batch api (not an http/404) + return err + } - content, err := transferAdapter.Download(ctx, link) - if err != nil { - return err - } + // the response from an lfs batch api request contained necessary upload/download fields to act upon + if uc != nil { + if len(object.Actions) == 0 { + log.Trace("%v already present on server", object.Pointer) + return nil + } + + link, ok := object.Actions["upload"] + if !ok { + return errors.New("missing action 'upload'") + } + + content, err := uc(object.Pointer, nil) + if err != nil { + return err + } - if err := dc(object.Pointer, content, nil); err != nil { + err = transferAdapter.Upload(ctx, link, object.Pointer, content) + if err != nil { + return err + } + + link, ok = object.Actions["verify"] + if ok { + if err := transferAdapter.Verify(ctx, link, object.Pointer); err != nil { return err } } - } + } else { + link, ok := object.Actions["download"] + if !ok { + // no actions block in response, try legacy response schema + link, ok = object.Links["download"] + } + if !ok { + log.Debug("%+v", object) + return errors.New("missing action 'download'") + } + content, err := transferAdapter.Download(ctx, link) + if err != nil { + return err + } + + if err := dc(object.Pointer, content, nil); err != nil { + return err + } + } return nil } diff --git a/modules/lfs/http_client_test.go b/modules/lfs/http_client_test.go index ec90f5375d1b9..e17b38a1c9eee 100644 --- a/modules/lfs/http_client_test.go +++ b/modules/lfs/http_client_test.go @@ -211,36 +211,31 @@ func TestHTTPClientDownload(t *testing.T) { expectederror: "TransferAdapter not found: ", }, // case 5 - { - endpoint: "https://error-in-response-objects.io", - expectederror: "Object not found", - }, - // case 6 { endpoint: "https://empty-actions-map.io", expectederror: "missing action 'download'", }, - // case 7 + // case 6 { endpoint: "https://download-actions-map.io", expectederror: "", }, - // case 8 + // case 7 { endpoint: "https://upload-actions-map.io", expectederror: "missing action 'download'", }, - // case 9 + // case 8 { endpoint: "https://verify-actions-map.io", expectederror: "missing action 'download'", }, - // case 10 + // case 9 { endpoint: "https://unknown-actions-map.io", expectederror: "missing action 'download'", }, - // case 11 + // case 10 { endpoint: "https://legacy-batch-request-download.io", expectederror: "", diff --git a/modules/repository/repo.go b/modules/repository/repo.go index 3d1899b2fe006..cb926084baae4 100644 --- a/modules/repository/repo.go +++ b/modules/repository/repo.go @@ -5,7 +5,6 @@ package repository import ( "context" - "errors" "fmt" "io" "strings" @@ -182,10 +181,6 @@ func StoreMissingLfsObjectsInRepository(ctx context.Context, repo *repo_model.Re downloadObjects := func(pointers []lfs.Pointer) error { err := lfsClient.Download(ctx, pointers, func(p lfs.Pointer, content io.ReadCloser, objectError error) error { if objectError != nil { - if errors.Is(objectError, lfs.ErrObjectNotExist) { - log.Warn("Repo[%-v]: Ignore missing LFS object %-v: %v", repo, p, objectError) - return nil - } return objectError }