diff --git a/httpcore5-jackson2/src/main/java/org/apache/hc/core5/jackson2/http/JsonSequenceResponseConsumer.java b/httpcore5-jackson2/src/main/java/org/apache/hc/core5/jackson2/http/JsonSequenceResponseConsumer.java index 80daef980..0f15724e2 100644 --- a/httpcore5-jackson2/src/main/java/org/apache/hc/core5/jackson2/http/JsonSequenceResponseConsumer.java +++ b/httpcore5-jackson2/src/main/java/org/apache/hc/core5/jackson2/http/JsonSequenceResponseConsumer.java @@ -81,6 +81,10 @@ public void completed(final E result) { if (errorCallback != null) { errorCallback.execute(result); } + if (resultCallback != null) { + // Error content has been fully processed, signal exchange completion. + resultCallback.completed(null); + } } }); diff --git a/httpcore5-jackson2/src/test/java/org/apache/hc/core5/jackson2/http/JsonResponseConsumersTest.java b/httpcore5-jackson2/src/test/java/org/apache/hc/core5/jackson2/http/JsonResponseConsumersTest.java index 2c9397ed8..8015fad80 100644 --- a/httpcore5-jackson2/src/test/java/org/apache/hc/core5/jackson2/http/JsonResponseConsumersTest.java +++ b/httpcore5-jackson2/src/test/java/org/apache/hc/core5/jackson2/http/JsonResponseConsumersTest.java @@ -27,12 +27,14 @@ package org.apache.hc.core5.jackson2.http; import java.io.InputStream; +import java.net.URI; import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import com.fasterxml.jackson.core.JsonFactory; @@ -47,6 +49,7 @@ import org.apache.hc.core5.http.UnsupportedMediaTypeException; import org.apache.hc.core5.http.impl.BasicEntityDetails; import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; import org.apache.hc.core5.http.protocol.HttpCoreContext; @@ -362,4 +365,53 @@ void testResponseJsonTokenContentCorrectlyProcessed() throws Exception { Mockito.verifyNoMoreInteractions(mockJsonTokenConsumer); } + @Test + void testAsyncPipelineErrorResponseInvokesErrorCallbackAndSignalsResultCallback() throws Exception { + final String errorBody = "Unexpected internal failure"; + final AtomicReference errorRef = new AtomicReference<>(); + final AtomicBoolean completed = new AtomicBoolean(); + final AtomicBoolean failed = new AtomicBoolean(); + final AtomicBoolean cancelled = new AtomicBoolean(); + final AsyncClientExchangeHandler exchangeHandler = AsyncJsonClientPipeline.assemble(objectMapper) + .request() + .get(URI.create("http://localhost/test")) + .response() + .asSequence( + RequestData.class, + response -> { + }, + error -> errorRef.set(error != null ? error.asText() : null), + requestData -> { + }) + .result(new FutureCallback() { + @Override + public void completed(final Long result) { + completed.set(true); + } + + @Override + public void failed(final Exception ex) { + failed.set(true); + } + + @Override + public void cancelled() { + cancelled.set(true); + } + }) + .create(); + + exchangeHandler.consumeResponse( + BasicResponseBuilder.create(500).build(), + new BasicEntityDetails(errorBody.length(), ContentType.TEXT_PLAIN), + HttpCoreContext.create()); + exchangeHandler.consume(ByteBuffer.wrap(errorBody.getBytes(StandardCharsets.UTF_8))); + exchangeHandler.streamEnd(Collections.emptyList()); + + Assertions.assertThat(errorRef.get()).isEqualTo(errorBody); + Assertions.assertThat(completed.get()).isTrue(); + Assertions.assertThat(failed.get()).isFalse(); + Assertions.assertThat(cancelled.get()).isFalse(); + } + }