Skip to content

Commit

Permalink
Fix response flushing on streaming k8s requests (#5009)
Browse files Browse the repository at this point in the history
Streaming requests, like `kubectl logs -f` will slowly write response
data over time. The `http.ResponseWriter` wrapper we added for capturing
the response code didn't propagate `http.Flusher` interface and
prevented the forwarder library from periodically flushing response
contents.

This caused `kubectl logs -f` results to be delayed, delivered in
batches as some internal buffer filled up.
  • Loading branch information
Andrew Lytvynov authored Dec 1, 2020
1 parent 4c2e221 commit c4583b7
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,8 +1094,8 @@ func (f *Forwarder) catchAll(ctx *authContext, w http.ResponseWriter, req *http.
f.Errorf("Failed to set up forwarding headers: %v.", err)
return nil, trace.Wrap(err)
}
w = &responseStatusRecorder{ResponseWriter: w}
sess.forwarder.ServeHTTP(w, req)
rw := newResponseStatusRecorder(w)
sess.forwarder.ServeHTTP(rw, req)

if sess.noAuditEvents {
return nil, nil
Expand All @@ -1122,7 +1122,7 @@ func (f *Forwarder) catchAll(ctx *authContext, w http.ResponseWriter, req *http.
},
RequestPath: req.URL.Path,
Verb: req.Method,
ResponseCode: int32(w.(*responseStatusRecorder).getStatus()),
ResponseCode: int32(rw.getStatus()),
KubernetesClusterMetadata: ctx.eventClusterMeta(),
}
r := parseResourcePath(req.URL.Path)
Expand Down Expand Up @@ -1611,14 +1611,37 @@ func (f *Forwarder) kubeClusters() []*services.KubernetesCluster {

type responseStatusRecorder struct {
http.ResponseWriter
status int
flusher http.Flusher
status int
}

func newResponseStatusRecorder(w http.ResponseWriter) *responseStatusRecorder {
rec := &responseStatusRecorder{ResponseWriter: w}
if flusher, ok := w.(http.Flusher); ok {
rec.flusher = flusher
}
return rec
}

func (r *responseStatusRecorder) WriteHeader(status int) {
r.status = status
r.ResponseWriter.WriteHeader(status)
}

// Flush optionally flushes the inner ResponseWriter if it supports that.
// Otherwise, Flush is a noop.
//
// Flush is optionally used by github.com/gravitational/oxy/forward to flush
// pending data on streaming HTTP responses (like streaming pod logs).
//
// Without this, oxy/forward will handle streaming responses by accumulating
// ~32kb of response in a buffer before flushing it.
func (r *responseStatusRecorder) Flush() {
if r.flusher != nil {
r.flusher.Flush()
}
}

func (r *responseStatusRecorder) getStatus() int {
// http.ResponseWriter implicitly sets StatusOK, if WriteHeader hasn't been
// explicitly called.
Expand Down

0 comments on commit c4583b7

Please # to comment.