Skip to content

Commit

Permalink
GH-3589: Refactor toMessage batch method and fix logging/header issues
Browse files Browse the repository at this point in the history
Remove unnecessary check

Fix checkstyle violation

Change log level from WARN to DEBUG

Refactor headers conversion method

Add natives condition for logging when no header mapper
  • Loading branch information
bky373 authored and sobychacko committed Dec 24, 2024
1 parent 7b56e33 commit c7ee2f4
Showing 1 changed file with 39 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* @author Gary Russell
* @author Dariusz Szablinski
* @author Biju Kunjummen
* @author Borahm Lee
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {
Expand Down Expand Up @@ -89,7 +90,7 @@ public BatchMessagingMessageConverter() {
* @param recordConverter the converter.
* @since 1.3.2
*/
public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) {
public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) {
this.recordConverter = recordConverter;
if (JacksonPresent.isJackson2Present()) {
this.headerMapper = new DefaultKafkaHeaderMapper();
Expand Down Expand Up @@ -157,53 +158,42 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
List<Headers> natives = new ArrayList<>();
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
List<ConversionException> conversionFailures = new ArrayList<>();

addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
timestamps);
boolean logged = false;
String info = null;

String listenerInfo = null;
for (ConsumerRecord<?, ?> record : records) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
if (object instanceof String) {
info = (String) object;
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
if (obj instanceof String) {
listenerInfo = (String) obj;
}
}
else {
if (!logged) {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
logged = true;
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
}
}
if (info != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
if (this.headerMapper == null && !natives.isEmpty()) {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
}
if (listenerInfo != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo);
}
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {

if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
Expand All @@ -216,12 +206,33 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
}

private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
timestamps.add(record.timestamp());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
}

private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
return this.recordConverter == null || !containerType(type)
? extractAndConvertValue(record, type)
: convert(record, type, conversionFailures);
}

private Map<String, Object> convertHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(headers, converted);
convertedHeaders.add(converted);
return converted;
}

@Override
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit c7ee2f4

Please # to comment.