diff --git a/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java b/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java index 9231d82..ab403c8 100644 --- a/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java +++ b/streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java @@ -124,13 +124,21 @@ public void publishData(List> messages) throws AEPStrea LOG.error("Failed to publish data to Adobe Experience Platform", jsonException); if (Objects.nonNull(errorReporter)) { messages.forEach(message -> errorReporter.report(message.getValue(), jsonException)); + } else { + throw new AEPStreamingException("Failed to publish invalid JSON", jsonException); } } catch (HttpException httpException) { LOG.error("Failed to publish data to Adobe Experience Platform", httpException); + + final int responseCode = httpException.getResponseCode(); if (Objects.nonNull(errorReporter)) { messages.forEach(message -> errorReporter.report(message.getValue(), httpException)); + } else { + if (HttpUtil.is500(responseCode)) { + throw new AEPStreamingException("Failed to publish", httpException); + } } - final int responseCode = httpException.getResponseCode(); + if (HttpUtil.isUnauthorized(responseCode)) { throw new AEPStreamingException("Failed to publish", httpException); }