diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index c360152cf1cf..1d8083adbcff 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -175,6 +175,8 @@ protected void needsFillInterest() throws IOException throw new ClosedChannelException(); ByteBuffer in = _inQ.peek(); + if (LOG.isDebugEnabled()) + LOG.debug("{} needsFillInterest EOF={} {}", this, in == EOF, BufferUtil.toDetailString(in)); if (BufferUtil.hasContent(in) || isEOF(in)) execute(_runFillable); } @@ -201,11 +203,15 @@ public void addInput(ByteBuffer in) boolean wasEmpty = _inQ.isEmpty(); if (in == null) { + if (LOG.isDebugEnabled()) + LOG.debug("{} addEOFAndRun=true", this); _inQ.add(EOF); fillable = true; } if (BufferUtil.hasContent(in)) { + if (LOG.isDebugEnabled()) + LOG.debug("{} addInputAndRun={} {}", this, wasEmpty, BufferUtil.toDetailString(in)); _inQ.add(in); fillable = wasEmpty; } @@ -234,11 +240,15 @@ public void addInputAndExecute(ByteBuffer in) boolean wasEmpty = _inQ.isEmpty(); if (in == null) { + if (LOG.isDebugEnabled()) + LOG.debug("{} addEOFAndExecute=true", this); _inQ.add(EOF); fillable = true; } if (BufferUtil.hasContent(in)) { + if (LOG.isDebugEnabled()) + LOG.debug("{} addInputAndExecute={} {}", this, wasEmpty, BufferUtil.toDetailString(in)); _inQ.add(in); fillable = wasEmpty; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java index 253097a40be8..3a4bec8776e1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java @@ -77,7 +77,7 @@ public int available() HttpInput.Content content = nextTransformedContent(); int available = content == null ? 0 : content.remaining(); if (LOG.isDebugEnabled()) - LOG.debug("available = {}", available); + LOG.debug("available = {} {}", available, this); return available; } @@ -86,7 +86,7 @@ public boolean hasContent() { boolean hasContent = _rawContent != null; if (LOG.isDebugEnabled()) - LOG.debug("hasContent = {}", hasContent); + LOG.debug("hasContent = {} {}", hasContent, this); return hasContent; } @@ -94,7 +94,7 @@ public boolean hasContent() public boolean isError() { if (LOG.isDebugEnabled()) - LOG.debug("isError = {}", _error); + LOG.debug("isError = {} {}", _error, this); return _error; } @@ -103,7 +103,7 @@ public void checkMinDataRate() { long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate(); if (LOG.isDebugEnabled()) - LOG.debug("checkMinDataRate [m={},t={}]", minRequestDataRate, _firstByteTimeStamp); + LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this); if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE) { long period = System.nanoTime() - _firstByteTimeStamp; @@ -113,13 +113,13 @@ public void checkMinDataRate() if (getRawContentArrived() < minimumData) { if (LOG.isDebugEnabled()) - LOG.debug("checkMinDataRate check failed"); + LOG.debug("checkMinDataRate check failed {}", this); BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408, String.format("Request content data rate < %d B/s", minRequestDataRate)); if (_httpChannel.getState().isResponseCommitted()) { if (LOG.isDebugEnabled()) - LOG.debug("checkMinDataRate aborting channel"); + LOG.debug("checkMinDataRate aborting channel {}", this); _httpChannel.abort(bad); } failCurrentContent(bad); @@ -133,7 +133,7 @@ public void checkMinDataRate() public long getRawContentArrived() { if (LOG.isDebugEnabled()) - LOG.debug("getRawContentArrived = {}", _rawContentArrived); + LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this); return _rawContentArrived; } @@ -141,7 +141,7 @@ public long getRawContentArrived() public boolean consumeAll(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("consumeAll [e={}]", (Object)x); + LOG.debug("consumeAll [e={}] {}", x, this); failCurrentContent(x); // A specific HttpChannel mechanism must be used as the following code // does not guarantee that the channel will synchronously deliver all @@ -156,14 +156,14 @@ public boolean consumeAll(Throwable x) // deliver the content asynchronously. Tests in StreamResetTest cover this. boolean atEof = _httpChannel.failAllContent(x); if (LOG.isDebugEnabled()) - LOG.debug("failed all content of http channel; at EOF? {}", atEof); + LOG.debug("failed all content of http channel EOF={} {}", atEof, this); return atEof; } private void failCurrentContent(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("failing currently held content [r={},t={}]", _rawContent, _transformedContent, x); + LOG.debug("failing currently held content {}", this, x); if (_transformedContent != null && !_transformedContent.isSpecial()) { if (_transformedContent != _rawContent) @@ -186,7 +186,7 @@ private void failCurrentContent(Throwable x) public boolean onContentProducible() { if (LOG.isDebugEnabled()) - LOG.debug("onContentProducible"); + LOG.debug("onContentProducible {}", this); return _httpChannel.getState().onReadReady(); } @@ -195,7 +195,7 @@ public HttpInput.Content nextContent() { HttpInput.Content content = nextTransformedContent(); if (LOG.isDebugEnabled()) - LOG.debug("nextContent = {}", content); + LOG.debug("nextContent = {} {}", content, this); if (content != null) _httpChannel.getState().onReadIdle(); return content; @@ -205,7 +205,7 @@ public HttpInput.Content nextContent() public void reclaim(HttpInput.Content content) { if (LOG.isDebugEnabled()) - LOG.debug("reclaim {} [t={}]", content, _transformedContent); + LOG.debug("reclaim {} {}", content, this); if (_transformedContent == content) { content.succeeded(); @@ -239,7 +239,7 @@ public boolean isReady() else { if (LOG.isDebugEnabled()) - LOG.debug("isReady got transformed content {}", content); + LOG.debug("isReady got transformed content {} {}", content, this); _httpChannel.getState().onContentAdded(); } boolean ready = content != null; @@ -251,7 +251,7 @@ public boolean isReady() private HttpInput.Content nextTransformedContent() { if (LOG.isDebugEnabled()) - LOG.debug("nextTransformedContent [r={},t={}]", _rawContent, _transformedContent); + LOG.debug("nextTransformedContent {}", this); if (_rawContent == null) { _rawContent = produceRawContent(); @@ -264,7 +264,7 @@ private HttpInput.Content nextTransformedContent() if (_transformedContent != _rawContent) _transformedContent.succeeded(); if (LOG.isDebugEnabled()) - LOG.debug("nulling depleted transformed content"); + LOG.debug("nulling depleted transformed content {}", this); _transformedContent = null; } @@ -276,20 +276,20 @@ private HttpInput.Content nextTransformedContent() _error = _rawContent.getError() != null; if (LOG.isDebugEnabled()) - LOG.debug("raw content is special (with error = {}), returning it", _error); + LOG.debug("raw content is special (with error = {}), returning it {}", _error, this); return _rawContent; } if (_interceptor != null) { if (LOG.isDebugEnabled()) - LOG.debug("using interceptor {} to transform raw content", _interceptor); + LOG.debug("using interceptor to transform raw content {}", this); _transformedContent = _interceptor.readFrom(_rawContent); } else { if (LOG.isDebugEnabled()) - LOG.debug("null interceptor, transformed content = raw content"); + LOG.debug("null interceptor, transformed content = raw content {}", this); _transformedContent = _rawContent; } @@ -298,7 +298,7 @@ private HttpInput.Content nextTransformedContent() if (_transformedContent != _rawContent) _transformedContent.succeeded(); if (LOG.isDebugEnabled()) - LOG.debug("nulling depleted transformed content"); + LOG.debug("nulling depleted transformed content {}", this); _transformedContent = null; } @@ -309,30 +309,30 @@ private HttpInput.Content nextTransformedContent() _rawContent.succeeded(); _rawContent = null; if (LOG.isDebugEnabled()) - LOG.debug("nulling depleted raw content"); + LOG.debug("nulling depleted raw content {}", this); _rawContent = produceRawContent(); if (_rawContent == null) { if (LOG.isDebugEnabled()) - LOG.debug("produced null raw content, returning null"); + LOG.debug("produced null raw content, returning null, {}", this); return null; } } else { if (LOG.isDebugEnabled()) - LOG.debug("raw content is not empty"); + LOG.debug("raw content is not empty {}", this); } } else { if (LOG.isDebugEnabled()) - LOG.debug("transformed content is not empty"); + LOG.debug("transformed content is not empty {}", this); } } if (LOG.isDebugEnabled()) - LOG.debug("returning transformed content {}", _transformedContent); + LOG.debug("returning transformed content {}", this); return _transformedContent; } @@ -345,10 +345,24 @@ private HttpInput.Content produceRawContent() if (_firstByteTimeStamp == Long.MIN_VALUE) _firstByteTimeStamp = System.nanoTime(); if (LOG.isDebugEnabled()) - LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {}", _rawContentArrived, _firstByteTimeStamp); + LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {} {}", _rawContentArrived, _firstByteTimeStamp, this); } if (LOG.isDebugEnabled()) - LOG.debug("produceRawContent produced {}", content); + LOG.debug("produceRawContent produced {} {}", content, this); return content; } + + @Override + public String toString() + { + return String.format("%s@%x[r=%s,t=%s,i=%s,error=%b,c=%s]", + getClass().getSimpleName(), + hashCode(), + _rawContent, + _transformedContent, + _interceptor, + _error, + _httpChannel + ); + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 9db3e27cf9c5..489d9f9e4f21 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -83,7 +83,7 @@ public Interceptor getInterceptor() public void setInterceptor(Interceptor interceptor) { if (LOG.isDebugEnabled()) - LOG.debug("setting interceptor to {}", interceptor); + LOG.debug("setting interceptor to {} on {}", interceptor, this); _contentProducer.setInterceptor(interceptor); } @@ -99,14 +99,14 @@ public void addInterceptor(Interceptor interceptor) if (currentInterceptor == null) { if (LOG.isDebugEnabled()) - LOG.debug("adding single interceptor: {}", interceptor); + LOG.debug("adding single interceptor: {} on {}", interceptor, this); _contentProducer.setInterceptor(interceptor); } else { ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor); if (LOG.isDebugEnabled()) - LOG.debug("adding chained interceptor: {}", chainedInterceptor); + LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this); _contentProducer.setInterceptor(chainedInterceptor); } } @@ -131,7 +131,7 @@ public long getContentReceived() public boolean consumeAll() { if (LOG.isDebugEnabled()) - LOG.debug("consume all"); + LOG.debug("consumeAll {}", this); boolean atEof = _contentProducer.consumeAll(new IOException("Unconsumed content")); if (atEof) _consumedEof = true; @@ -146,14 +146,14 @@ public boolean isError() { boolean error = _contentProducer.isError(); if (LOG.isDebugEnabled()) - LOG.debug("isError = {}", error); + LOG.debug("isError={} {}", error, this); return error; } public boolean isAsync() { if (LOG.isDebugEnabled()) - LOG.debug("isAsync read listener = " + _readListener); + LOG.debug("isAsync read listener {} {}", _readListener, this); return _readListener != null; } @@ -164,7 +164,7 @@ public boolean isFinished() { boolean finished = _consumedEof; if (LOG.isDebugEnabled()) - LOG.debug("isFinished? {}", finished); + LOG.debug("isFinished={} {}", finished, this); return finished; } @@ -172,23 +172,16 @@ public boolean isFinished() public boolean isReady() { boolean ready = _contentProducer.isReady(); - if (!ready) - { - if (LOG.isDebugEnabled()) - LOG.debug("isReady? false"); - return false; - } - if (LOG.isDebugEnabled()) - LOG.debug("isReady? true"); - return true; + LOG.debug("isReady={} {}", ready, this); + return ready; } @Override public void setReadListener(ReadListener readListener) { if (LOG.isDebugEnabled()) - LOG.debug("setting read listener to {}", readListener); + LOG.debug("setting read listener to {} {}", readListener, this); if (_readListener != null) throw new IllegalStateException("ReadListener already set"); _readListener = Objects.requireNonNull(readListener); @@ -229,7 +222,7 @@ public int read(byte[] b, int off, int len) throws IOException { int read = get(content, b, off, len); if (LOG.isDebugEnabled()) - LOG.debug("read produced {} byte(s)", read); + LOG.debug("read produced {} byte(s) {}", read, this); if (content.isEmpty()) _contentProducer.reclaim(content); return read; @@ -237,7 +230,7 @@ public int read(byte[] b, int off, int len) throws IOException Throwable error = content.getError(); if (LOG.isDebugEnabled()) - LOG.debug("read error = " + error); + LOG.debug("read error={} {}", error, this); if (error != null) { if (error instanceof IOException) @@ -248,7 +241,7 @@ public int read(byte[] b, int off, int len) throws IOException if (content.isEof()) { if (LOG.isDebugEnabled()) - LOG.debug("read at EOF, setting consumed EOF to true"); + LOG.debug("read at EOF, setting consumed EOF to true {}", this); _consumedEof = true; // If EOF do we need to wake for allDataRead callback? if (onContentProducible()) @@ -276,7 +269,7 @@ public boolean hasContent() // which is forbidden by this method's contract. boolean hasContent = _contentProducer.hasContent(); if (LOG.isDebugEnabled()) - LOG.debug("hasContent = {}", hasContent); + LOG.debug("hasContent={} {}", hasContent, this); return hasContent; } @@ -285,7 +278,7 @@ public int available() { int available = _contentProducer.available(); if (LOG.isDebugEnabled()) - LOG.debug("available = {}", available); + LOG.debug("available={} {}", available, this); return available; } @@ -300,17 +293,13 @@ public void run() { Content content = _contentProducer.nextContent(); if (LOG.isDebugEnabled()) - LOG.debug("running on content {}", content); - // The nextContent() call could return null if the transformer ate all - // the raw bytes without producing any transformed content. - if (content == null) - return; + LOG.debug("running on content {} {}", content, this); // This check is needed when a request is started async but no read listener is registered. if (_readListener == null) { if (LOG.isDebugEnabled()) - LOG.debug("running without a read listener"); + LOG.debug("running without a read listener {}", this); onContentProducible(); return; } @@ -321,7 +310,7 @@ public void run() if (error != null) { if (LOG.isDebugEnabled()) - LOG.debug("running has error: {}", (Object)error); + LOG.debug("running error={} {}", error, this); // TODO is this necessary to add here? _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); _readListener.onError(error); @@ -331,13 +320,13 @@ else if (content.isEof()) try { if (LOG.isDebugEnabled()) - LOG.debug("running at EOF"); + LOG.debug("running at EOF {}", this); _readListener.onAllDataRead(); } catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("running failed onAllDataRead", x); + LOG.debug("running failed onAllDataRead {}", this, x); _readListener.onError(x); } } @@ -345,7 +334,7 @@ else if (content.isEof()) else { if (LOG.isDebugEnabled()) - LOG.debug("running has content"); + LOG.debug("running has content {}", this); try { _readListener.onDataAvailable(); @@ -353,7 +342,7 @@ else if (content.isEof()) catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("running failed onDataAvailable", x); + LOG.debug("running failed onDataAvailable {}", this, x); _readListener.onError(x); } } diff --git a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java index ca04f2cffbcb..73c4019c1480 100644 --- a/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java +++ b/tests/test-integration/src/test/java/org/eclipse/jetty/test/HttpInputIntegrationTest.java @@ -90,7 +90,7 @@ public static void beforeClass() throws Exception __server.addConnector(local); ServerConnector http = new ServerConnector(__server, new HttpConnectionFactory(__config), new HTTP2CServerConnectionFactory(__config)); - http.setIdleTimeout(4000); + http.setIdleTimeout(5000); __server.addConnector(http); // SSL Context Factory for HTTPS and HTTP/2 @@ -119,7 +119,7 @@ public static void beforeClass() throws Exception // HTTP/2 Connector ServerConnector http2 = new ServerConnector(__server, ssl,/*TODO alpn,h2,*/ h1); - http2.setIdleTimeout(4000); + http2.setIdleTimeout(5000); __server.addConnector(http2); ServletContextHandler context = new ServletContextHandler(__server, "/ctx"); @@ -336,7 +336,7 @@ public void testStress(Scenario scenario) throws Exception for (int i = 0; i < threads; i++) { - t[i] = new Thread(run); + t[i] = new Thread(run, "client-" + i); t[i].start(); } for (int i = 0; i < threads; i++)