Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Commit fbf2a4a

Browse files
authored
Merge pull request #507 from mcuadros/ctx
transport: context package support allowing cancellation of any network operation
2 parents 25a0420 + db5fa52 commit fbf2a4a

File tree

14 files changed

+387
-42
lines changed

14 files changed

+387
-42
lines changed

plumbing/transport/common.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package transport
1414

1515
import (
16+
"context"
1617
"errors"
1718
"fmt"
1819
"io"
@@ -72,7 +73,7 @@ type UploadPackSession interface {
7273
// including a packfile. Don't be confused by terminology, the client
7374
// side of a git-upload-pack is called git-fetch-pack, although here
7475
// the same interface is used to make it RPC-like.
75-
UploadPack(*packp.UploadPackRequest) (*packp.UploadPackResponse, error)
76+
UploadPack(context.Context, *packp.UploadPackRequest) (*packp.UploadPackResponse, error)
7677
}
7778

7879
// ReceivePackSession represents a git-receive-pack session.
@@ -86,7 +87,7 @@ type ReceivePackSession interface {
8687
// terminology, the client side of a git-receive-pack is called
8788
// git-send-pack, although here the same interface is used to make it
8889
// RPC-like.
89-
ReceivePack(*packp.ReferenceUpdateRequest) (*packp.ReportStatus, error)
90+
ReceivePack(context.Context, *packp.ReferenceUpdateRequest) (*packp.ReportStatus, error)
9091
}
9192

9293
// Endpoint represents a Git URL in any supported protocol.

plumbing/transport/file/client.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ func NewClient(uploadPackBin, receivePackBin string) transport.Transport {
3030
})
3131
}
3232

33-
func (r *runner) Command(cmd string, ep transport.Endpoint, auth transport.AuthMethod) (common.Command, error) {
33+
func (r *runner) Command(cmd string, ep transport.Endpoint, auth transport.AuthMethod,
34+
) (common.Command, error) {
35+
3436
switch cmd {
3537
case transport.UploadPackServiceName:
3638
cmd = r.UploadPackBin
@@ -72,6 +74,11 @@ func (c *command) StdoutPipe() (io.Reader, error) {
7274
return c.cmd.StdoutPipe()
7375
}
7476

77+
func (c *command) Kill() error {
78+
c.cmd.Process.Kill()
79+
return c.Close()
80+
}
81+
7582
// Close waits for the command to exit.
7683
func (c *command) Close() error {
7784
if c.closed {
@@ -81,6 +88,7 @@ func (c *command) Close() error {
8188
defer func() {
8289
c.closed = true
8390
_ = c.stderrCloser.Close()
91+
8492
}()
8593

8694
err := c.cmd.Wait()

plumbing/transport/file/upload_pack_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,9 @@ func (s *UploadPackSuite) TestNonExistentCommand(c *C) {
7878
c.Assert(err, ErrorMatches, ".*file.*")
7979
c.Assert(session, IsNil)
8080
}
81+
82+
func (s *UploadPackSuite) TestUploadPackWithContextOnRead(c *C) {
83+
// TODO: Fix race condition when Session.Close and the read failed due to a
84+
// canceled context when the packfile is being read.
85+
c.Skip("UploadPack has a race condition when we Close the session")
86+
}

plumbing/transport/http/receive_pack.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package http
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
78
"net/http"
@@ -25,7 +26,7 @@ func (s *rpSession) AdvertisedReferences() (*packp.AdvRefs, error) {
2526
return advertisedReferences(s.session, transport.ReceivePackServiceName)
2627
}
2728

28-
func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (
29+
func (s *rpSession) ReceivePack(ctx context.Context, req *packp.ReferenceUpdateRequest) (
2930
*packp.ReportStatus, error) {
3031
url := fmt.Sprintf(
3132
"%s/%s",
@@ -37,7 +38,7 @@ func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (
3738
return nil, err
3839
}
3940

40-
res, err := s.doRequest(http.MethodPost, url, buf)
41+
res, err := s.doRequest(ctx, http.MethodPost, url, buf)
4142
if err != nil {
4243
return nil, err
4344
}
@@ -61,7 +62,10 @@ func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (
6162
return report, report.Error()
6263
}
6364

64-
func (s *rpSession) doRequest(method, url string, content *bytes.Buffer) (*http.Response, error) {
65+
func (s *rpSession) doRequest(
66+
ctx context.Context, method, url string, content *bytes.Buffer,
67+
) (*http.Response, error) {
68+
6569
var body io.Reader
6670
if content != nil {
6771
body = content
@@ -75,7 +79,7 @@ func (s *rpSession) doRequest(method, url string, content *bytes.Buffer) (*http.
7579
applyHeadersToRequest(req, content, s.endpoint.Host(), transport.ReceivePackServiceName)
7680
s.applyAuthToRequest(req)
7781

78-
res, err := s.client.Do(req)
82+
res, err := s.client.Do(req.WithContext(ctx))
7983
if err != nil {
8084
return nil, plumbing.NewUnexpectedError(err)
8185
}

plumbing/transport/http/upload_pack.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package http
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
78
"net/http"
@@ -28,7 +29,10 @@ func (s *upSession) AdvertisedReferences() (*packp.AdvRefs, error) {
2829
return advertisedReferences(s.session, transport.UploadPackServiceName)
2930
}
3031

31-
func (s *upSession) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackResponse, error) {
32+
func (s *upSession) UploadPack(
33+
ctx context.Context, req *packp.UploadPackRequest,
34+
) (*packp.UploadPackResponse, error) {
35+
3236
if req.IsEmpty() {
3337
return nil, transport.ErrEmptyUploadPackRequest
3438
}
@@ -47,7 +51,7 @@ func (s *upSession) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackR
4751
return nil, err
4852
}
4953

50-
res, err := s.doRequest(http.MethodPost, url, content)
54+
res, err := s.doRequest(ctx, http.MethodPost, url, content)
5155
if err != nil {
5256
return nil, err
5357
}
@@ -70,7 +74,10 @@ func (s *upSession) Close() error {
7074
return nil
7175
}
7276

73-
func (s *upSession) doRequest(method, url string, content *bytes.Buffer) (*http.Response, error) {
77+
func (s *upSession) doRequest(
78+
ctx context.Context, method, url string, content *bytes.Buffer,
79+
) (*http.Response, error) {
80+
7481
var body io.Reader
7582
if content != nil {
7683
body = content
@@ -84,7 +91,7 @@ func (s *upSession) doRequest(method, url string, content *bytes.Buffer) (*http.
8491
applyHeadersToRequest(req, content, s.endpoint.Host(), transport.UploadPackServiceName)
8592
s.applyAuthToRequest(req)
8693

87-
res, err := s.client.Do(req)
94+
res, err := s.client.Do(req.WithContext(ctx))
8895
if err != nil {
8996
return nil, plumbing.NewUnexpectedError(err)
9097
}

plumbing/transport/internal/common/common.go

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package common
77

88
import (
99
"bufio"
10+
"context"
1011
"errors"
1112
"fmt"
1213
"io"
@@ -64,6 +65,13 @@ type Command interface {
6465
Close() error
6566
}
6667

68+
// CommandKiller expands the Command interface, enableing it for being killed.
69+
type CommandKiller interface {
70+
// Kill and close the session whatever the state it is. It will block until
71+
// the command is terminated.
72+
Kill() error
73+
}
74+
6775
type client struct {
6876
cmdr Commander
6977
}
@@ -212,7 +220,7 @@ func (s *session) handleAdvRefDecodeError(err error) error {
212220

213221
// UploadPack performs a request to the server to fetch a packfile. A reader is
214222
// returned with the packfile content. The reader must be closed after reading.
215-
func (s *session) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackResponse, error) {
223+
func (s *session) UploadPack(ctx context.Context, req *packp.UploadPackRequest) (*packp.UploadPackResponse, error) {
216224
if req.IsEmpty() {
217225
return nil, transport.ErrEmptyUploadPackRequest
218226
}
@@ -227,11 +235,14 @@ func (s *session) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackRes
227235

228236
s.packRun = true
229237

230-
if err := uploadPack(s.Stdin, s.Stdout, req); err != nil {
238+
in := s.StdinContext(ctx)
239+
out := s.StdoutContext(ctx)
240+
241+
if err := uploadPack(in, out, req); err != nil {
231242
return nil, err
232243
}
233244

234-
r, err := ioutil.NonEmptyReader(s.Stdout)
245+
r, err := ioutil.NonEmptyReader(out)
235246
if err == ioutil.ErrEmptyReader {
236247
if c, ok := s.Stdout.(io.Closer); ok {
237248
_ = c.Close()
@@ -244,22 +255,45 @@ func (s *session) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackRes
244255
return nil, err
245256
}
246257

247-
rc := ioutil.NewReadCloser(r, s.Command)
258+
rc := ioutil.NewReadCloser(r, s)
248259
return DecodeUploadPackResponse(rc, req)
249260
}
250261

251-
func (s *session) ReceivePack(req *packp.ReferenceUpdateRequest) (*packp.ReportStatus, error) {
262+
func (s *session) StdinContext(ctx context.Context) io.WriteCloser {
263+
return ioutil.NewWriteCloserOnError(
264+
ioutil.NewContextWriteCloser(ctx, s.Stdin),
265+
s.onError,
266+
)
267+
}
268+
269+
func (s *session) StdoutContext(ctx context.Context) io.Reader {
270+
return ioutil.NewReaderOnError(
271+
ioutil.NewContextReader(ctx, s.Stdout),
272+
s.onError,
273+
)
274+
}
275+
276+
func (s *session) onError(err error) {
277+
if k, ok := s.Command.(CommandKiller); ok {
278+
_ = k.Kill()
279+
}
280+
281+
_ = s.Close()
282+
}
283+
284+
func (s *session) ReceivePack(ctx context.Context, req *packp.ReferenceUpdateRequest) (*packp.ReportStatus, error) {
252285
if _, err := s.AdvertisedReferences(); err != nil {
253286
return nil, err
254287
}
255288

256289
s.packRun = true
257290

258-
if err := req.Encode(s.Stdin); err != nil {
291+
w := s.StdinContext(ctx)
292+
if err := req.Encode(w); err != nil {
259293
return nil, err
260294
}
261295

262-
if err := s.Stdin.Close(); err != nil {
296+
if err := w.Close(); err != nil {
263297
return nil, err
264298
}
265299

@@ -270,11 +304,12 @@ func (s *session) ReceivePack(req *packp.ReferenceUpdateRequest) (*packp.ReportS
270304
}
271305

272306
report := packp.NewReportStatus()
273-
if err := report.Decode(s.Stdout); err != nil {
307+
if err := report.Decode(s.StdoutContext(ctx)); err != nil {
274308
return nil, err
275309
}
276310

277311
if err := report.Error(); err != nil {
312+
defer s.Close()
278313
return report, err
279314
}
280315

@@ -300,8 +335,9 @@ func (s *session) finish() error {
300335
}
301336

302337
func (s *session) Close() (err error) {
303-
defer ioutil.CheckClose(s.Command, &err)
304338
err = s.finish()
339+
340+
defer ioutil.CheckClose(s.Command, &err)
305341
return
306342
}
307343

plumbing/transport/internal/common/server.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package common
22

33
import (
4+
"context"
45
"fmt"
56
"io"
67

@@ -34,7 +35,7 @@ func ServeUploadPack(cmd ServerCommand, s transport.UploadPackSession) (err erro
3435
}
3536

3637
var resp *packp.UploadPackResponse
37-
resp, err = s.UploadPack(req)
38+
resp, err = s.UploadPack(context.TODO(), req)
3839
if err != nil {
3940
return err
4041
}
@@ -57,7 +58,7 @@ func ServeReceivePack(cmd ServerCommand, s transport.ReceivePackSession) error {
5758
return fmt.Errorf("error decoding: %s", err)
5859
}
5960

60-
rs, err := s.ReceivePack(req)
61+
rs, err := s.ReceivePack(context.TODO(), req)
6162
if rs != nil {
6263
if err := rs.Encode(cmd.Stdout); err != nil {
6364
return fmt.Errorf("error in encoding report status %s", err)

plumbing/transport/server/server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package server
44

55
import (
6+
"context"
67
"errors"
78
"fmt"
89
"io"
@@ -14,6 +15,7 @@ import (
1415
"gopkg.in/src-d/go-git.v4/plumbing/revlist"
1516
"gopkg.in/src-d/go-git.v4/plumbing/storer"
1617
"gopkg.in/src-d/go-git.v4/plumbing/transport"
18+
"gopkg.in/src-d/go-git.v4/utils/ioutil"
1719
)
1820

1921
var DefaultServer = NewServer(DefaultLoader)
@@ -129,7 +131,7 @@ func (s *upSession) AdvertisedReferences() (*packp.AdvRefs, error) {
129131
return ar, nil
130132
}
131133

132-
func (s *upSession) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackResponse, error) {
134+
func (s *upSession) UploadPack(ctx context.Context, req *packp.UploadPackRequest) (*packp.UploadPackResponse, error) {
133135
if req.IsEmpty() {
134136
return nil, transport.ErrEmptyUploadPackRequest
135137
}
@@ -167,7 +169,9 @@ func (s *upSession) UploadPack(req *packp.UploadPackRequest) (*packp.UploadPackR
167169
pw.CloseWithError(err)
168170
}()
169171

170-
return packp.NewUploadPackResponseWithPackfile(req, pr), nil
172+
return packp.NewUploadPackResponseWithPackfile(req,
173+
ioutil.NewContextReadCloser(ctx, pr),
174+
), nil
171175
}
172176

173177
func (s *upSession) objectsToUpload(req *packp.UploadPackRequest) ([]plumbing.Hash, error) {
@@ -222,7 +226,7 @@ var (
222226
ErrUpdateReference = errors.New("failed to update ref")
223227
)
224228

225-
func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (*packp.ReportStatus, error) {
229+
func (s *rpSession) ReceivePack(ctx context.Context, req *packp.ReferenceUpdateRequest) (*packp.ReportStatus, error) {
226230
if s.caps == nil {
227231
s.caps = capability.NewList()
228232
if err := s.setSupportedCapabilities(s.caps); err != nil {
@@ -238,7 +242,8 @@ func (s *rpSession) ReceivePack(req *packp.ReferenceUpdateRequest) (*packp.Repor
238242

239243
//TODO: Implement 'atomic' update of references.
240244

241-
if err := s.writePackfile(req.Packfile); err != nil {
245+
r := ioutil.NewContextReadCloser(ctx, req.Packfile)
246+
if err := s.writePackfile(r); err != nil {
242247
s.unpackErr = err
243248
s.firstErr = err
244249
return s.reportStatus(), err

plumbing/transport/server/upload_pack_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func (s *UploadPackSuite) TestAdvertisedReferencesNotExists(c *C) {
3939
c.Assert(r, IsNil)
4040
}
4141

42+
func (s *UploadPackSuite) TestUploadPackWithContext(c *C) {
43+
c.Skip("UploadPack cannot be canceled on server")
44+
}
45+
4246
// Tests server with `asClient = true`. This is recommended when using a server
4347
// registered directly with `client.InstallProtocol`.
4448
type ClientLikeUploadPackSuite struct {

0 commit comments

Comments
 (0)