Skip to content

Commit

Permalink
Resource release corrections and code improvements in the base implem…
Browse files Browse the repository at this point in the history
…entations of client / server async exchange handlers
  • Loading branch information
ok2c committed Feb 22, 2025
1 parent badb42f commit d0707cd
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,21 +148,23 @@ public void completed(final T result) {
.build(),
context);
} catch (final HttpException | IOException ex2) {
failed(ex2);
failedInternal(ex2);
}
} catch (final IOException ex) {
failed(ex);
failedInternal(ex);
} finally {
releaseRequestConsumer();
}
}

@Override
public void failed(final Exception ex) {
AbstractServerExchangeHandler.this.failed(ex);
failedInternal(ex);
}

@Override
public void cancelled() {
releaseResources();
releaseResourcesInternal();
}

});
Expand Down Expand Up @@ -205,30 +207,54 @@ public final void produce(final DataStreamChannel channel) throws IOException {

@Override
public final void failed(final Exception cause) {
failedInternal(cause);
}

void failedInternal(final Exception cause) {
try {
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
if (requestConsumer != null) {
requestConsumer.failed(cause);
}
} finally {
releaseRequestConsumer();
}
try {
final AsyncResponseProducer dataProducer = responseProducerRef.get();
if (dataProducer != null) {
dataProducer.failed(cause);
}
} finally {
releaseResources();
releaseResponseProducer();
}
}

@Override
public final void releaseResources() {
private void releaseRequestConsumer() {
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.getAndSet(null);
if (requestConsumer != null) {
requestConsumer.releaseResources();
}
}

private void releaseResponseProducer() {
final AsyncResponseProducer dataProducer = responseProducerRef.getAndSet(null);
if (dataProducer != null) {
dataProducer.releaseResources();
}
}

private void releaseResourcesInternal() {
releaseResponseProducer();
releaseRequestConsumer();
}

@Override
public final void releaseResources() {
// Note even though the message exchange has been fully
// completed on the transport level, the request
// consumer may still be busy post-processing
// the request message.
releaseResponseProducer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public final class BasicClientExchangeHandler<T> implements AsyncClientExchangeH
private final AsyncRequestProducer requestProducer;
private final AsyncResponseConsumer<T> responseConsumer;
private final AtomicBoolean completed;
private final FutureCallback<T> resultCallback;
private final AtomicBoolean outputTerminated;
private final AtomicBoolean inputTerminated;
private final FutureCallback<T> resultCallback;

public BasicClientExchangeHandler(
final AsyncRequestProducer requestProducer,
Expand All @@ -71,6 +72,7 @@ public BasicClientExchangeHandler(
this.completed = new AtomicBoolean();
this.resultCallback = resultCallback;
this.outputTerminated = new AtomicBoolean();
this.inputTerminated = new AtomicBoolean();
}

@Override
Expand Down Expand Up @@ -100,64 +102,31 @@ public void consumeInformation(final HttpResponse response, final HttpContext ht
@Override
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
outputTerminated.set(true);
requestProducer.releaseResources();
releaseRequestProducer();
}
responseConsumer.consumeResponse(response, entityDetails, httpContext, new FutureCallback<T>() {

@Override
public void completed(final T result) {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.completed(result);
}
} finally {
internalReleaseResources();
}
}
completedInternal(result);
}

@Override
public void failed(final Exception ex) {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.failed(ex);
}
} finally {
internalReleaseResources();
}
}
failedInternal(ex);
}

@Override
public void cancelled() {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.cancelled();
}
} finally {
internalReleaseResources();
}
}
cancelledInternal();
}

});
}

@Override
public void cancel() {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.cancelled();
}
} finally {
internalReleaseResources();
}
}
cancelledInternal();
}

@Override
Expand All @@ -178,28 +147,77 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio
@Override
public void failed(final Exception cause) {
try {
requestProducer.failed(cause);
responseConsumer.failed(cause);
if (inputTerminated.get()) {
responseConsumer.failed(cause);
}
if (!outputTerminated.get()) {
requestProducer.failed(cause);
}
} finally {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.failed(cause);
}
} finally {
internalReleaseResources();
failedInternal(cause);
}
}

private void completedInternal(final T result) {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.completed(result);
}
} finally {
releaseResourcesInternal();
}
}
}

private void internalReleaseResources() {
requestProducer.releaseResources();
responseConsumer.releaseResources();
private void failedInternal(final Exception ex) {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.failed(ex);
}
} finally {
releaseResourcesInternal();
}
}
}

private void cancelledInternal() {
if (completed.compareAndSet(false, true)) {
try {
if (resultCallback != null) {
resultCallback.cancelled();
}
} finally {
releaseResourcesInternal();
}
}
}

private void releaseResponseConsumer() {
if (inputTerminated.compareAndSet(false, true)) {
responseConsumer.releaseResources();
}
}

private void releaseRequestProducer() {
if (outputTerminated.compareAndSet(false, true)) {
requestProducer.releaseResources();
}
}

private void releaseResourcesInternal() {
releaseRequestProducer();
releaseResponseConsumer();
}

@Override
public void releaseResources() {
// Note even though the message exchange has been fully
// completed on the transport level, the response
// consumer may still be busy consuming and digesting
// the response message
releaseRequestProducer();
}

}

0 comments on commit d0707cd

Please # to comment.