Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Configurable commit timeout | Metrics for Timeout errors | Txn valid state in chaincode event headers #154

Merged
merged 6 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public Counter inboundTxnProcessingFailureCounter(MeterRegistry meterRegistry) {
public Counter inboundTxnContractExceptionCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.contract.failures");
}

@Bean
public Counter inboundTxnTimeoutExceptionCounter(MeterRegistry meterRegistry) {
return meterRegistry.counter("transaction.messages.timeout.failures");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static class ClientUser {
public static class OrgConnectionConfig {
private String path;
private String filename;
private int defaultCommitTimeoutInSeconds = 60;
}

@Data
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/hlf/java/rest/client/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ public enum ErrorCode {
5002,
"Hyperledger Fabric chaincode operations request has illegal argument or argument is missing."),

HYPERLEDGER_FABRIC_CONNECTION_TIMEOUT_ERROR(
5000, "Hyperledger Fabric Connection timed-out during Transaction"),

HYPERLEDGER_FABRIC_TRANSACTION_ERROR(6000, "Hyperledger Fabric transaction related error"),

HYPERLEDGER_FABRIC_TRANSACTION_CONTRACT_ERROR(
6001, "Exception occurred while executing contract method"),

HYPERLEDGER_FABRIC_TRANSACTION_TIMEOUT_ERROR(
6002, "Hyperledger Fabric Connection timed-out during Transaction"),

HYPERLEDGER_FABRIC_TRANSACTION_GATEWAY_ERROR(
6003, "A Gateway error has occurred due to possible API misconfiguration"),

HYPERLEDGER_FABRIC_NOT_SUPPORTED(8000, "In Hyperledger Fabric this feature is not supported"),

AUTH_INVALID_API_KEY(9000, "Invalid api key"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ public void chaincodeEventListener(ContractEvent contractEvent) {
contractEvent.getPayload().isPresent()
? new String(contractEvent.getPayload().get(), StandardCharsets.UTF_8)
: StringUtils.EMPTY;
boolean isValidTransaction = contractEvent.getTransactionEvent().isValid();

if (recencyTransactionContext.validateAndRemoveTransactionContext(txId)) {
publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
publishChaincodeEvent(
txId, chaincodeId, eventName, payload, channelName, blockNumber, isValidTransaction);
return;
}

Expand All @@ -60,15 +62,20 @@ public void chaincodeEventListener(ContractEvent contractEvent) {

@Deprecated
public void listener(
String handle, BlockInfo blockEvent, ChaincodeEvent chaincodeEvent, String channelName) {
String handle,
BlockInfo blockEvent,
ChaincodeEvent chaincodeEvent,
String channelName,
boolean isTxnValid) {

long blockNumber = blockEvent.getBlockNumber();
String txId = chaincodeEvent.getTxId();
String chaincodeId = chaincodeEvent.getChaincodeId();
String eventName = chaincodeEvent.getEventName();
String payload = new String(chaincodeEvent.getPayload(), StandardCharsets.UTF_8);

publishChaincodeEvent(txId, chaincodeId, eventName, payload, channelName, blockNumber);
publishChaincodeEvent(
txId, chaincodeId, eventName, payload, channelName, blockNumber, isTxnValid);
}

private void publishChaincodeEvent(
Expand All @@ -77,7 +84,8 @@ private void publishChaincodeEvent(
String eventName,
String payload,
String channelName,
long blockNumber) {
long blockNumber,
boolean isTransactionValid) {
synchronized (this) {
if (!txId.equalsIgnoreCase(eventTxnId)) {

Expand All @@ -86,6 +94,7 @@ private void publishChaincodeEvent(
log.info("Transaction ID: {}", txId);
log.info("Payload: {}", payload);
log.info("Channel Name: {}", channelName);
log.info("Transaction Valid state: {}", isTransactionValid);

if (eventPublishService == null) {
log.info("Event Publish is disabled, skipping this Chaincode event");
Expand Down Expand Up @@ -126,7 +135,8 @@ private void publishChaincodeEvent(
txId,
eventName,
channelName,
messageKey);
messageKey,
isTransactionValid);
eventTxnId = txId;
} else {
log.debug("Duplicate Transaction; ID: {}", txId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import hlf.java.rest.client.config.FabricProperties;
import hlf.java.rest.client.service.HFClientWrapper;
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.hyperledger.fabric.gateway.Contract;
import org.hyperledger.fabric.gateway.Gateway;
import org.hyperledger.fabric.gateway.Network;
import org.hyperledger.fabric.sdk.Channel;
import org.hyperledger.fabric.sdk.Peer;
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
import org.hyperledger.fabric.sdk.exception.TransactionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.context.config.annotation.RefreshScope;
Expand All @@ -36,18 +38,21 @@ public class FabricEventListener {
@Autowired private ChaincodeEventListener chaincodeEventService;

@EventListener
public void handleEvent(ContextRefreshedEvent event) {
public void handleEvent(ContextRefreshedEvent event)
throws InvalidArgumentException, TransactionException, IOException, ClassNotFoundException {
log.info("Initializing Chaincode/Block Event Listeners..");
startEventListener();
}

@EventListener(RefreshScopeRefreshedEvent.class)
public void onRefresh(RefreshScopeRefreshedEvent event) {
public void onRefresh(RefreshScopeRefreshedEvent event)
throws InvalidArgumentException, TransactionException, IOException, ClassNotFoundException {
log.info("Initializing Chaincode/Block Event Listeners..");
startEventListener();
}

private void startEventListener() {
private void startEventListener()
throws InvalidArgumentException, TransactionException, IOException, ClassNotFoundException {

try {
List<FabricProperties.BlockDetails> blockDetailsList =
Expand Down Expand Up @@ -85,18 +90,6 @@ private void startEventListener() {
fabricProperties.getEvents().getChaincodeDetails();
List<String> chaincodeChannelNames = fabricProperties.getEvents().getChaincode();

/**
* In-order to ensure backward compatiability, registering event-listeners through
* 'chaincodeChannelNames' is preserved until this Listener service fully moves to utilising
* Chaincode & Channel names provided via the 'chaincodeDetails' property. Until that,
* registering events via Channel names provided through 'chaincodeChannelNames' will be used
* if 'chaincodeDetails' is empty. If 'chaincodeDetails' is a non-empty list, then preference
* will be given to register Event-listener via the 'Contract' object and registering events
* through 'chaincodeChannelNames' will be skipped regardless whether it's populated or not.
*
* <p>P.S it is recommended to use 'Contract' object for registering Event-Listeners over
* registering it through 'Channel' Object.
*/
if (!CollectionUtils.isEmpty(chaincodeDetails)) {

for (FabricProperties.ChaincodeDetails chaincodeDetail : chaincodeDetails) {
Expand All @@ -106,26 +99,12 @@ private void startEventListener() {
contract.addContractListener(chaincodeEventService::chaincodeEventListener);
}
} else if (!CollectionUtils.isEmpty(chaincodeChannelNames)) {

for (String channelName : chaincodeChannelNames) {
Network network = gateway.getNetwork(channelName);

if (null != network) {
log.info("Creating event-listener for channel: {}", network);
Channel channel = network.getChannel();
channel.initialize();
channel.registerChaincodeEventListener(
Pattern.compile(".*"),
Pattern.compile(".*"),
(handle, blockEvent, chaincodeEvent) ->
chaincodeEventService.listener(
handle, blockEvent, chaincodeEvent, channel.getName()));
}
}
throw new InvalidArgumentException("Chaincode details are missing in the configuration");
}

} catch (Exception ex) {
log.error("Failed to register Block/Chaincode listener with error {}, ", ex.getMessage(), ex);
throw ex;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package hlf.java.rest.client.metrics;

import hlf.java.rest.client.exception.ErrorCode;
import hlf.java.rest.client.exception.FabricTransactionException;
import hlf.java.rest.client.exception.ServiceException;
import hlf.java.rest.client.exception.UnrecognizedTransactionPayloadException;
import io.micrometer.core.instrument.Counter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.hyperledger.fabric.gateway.ContractException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand All @@ -26,26 +28,28 @@ public class EmitCustomTransactionListenerMetricsAspect {

@Autowired private Counter inboundTxnContractExceptionCounter;

@Autowired private Counter inboundTxnTimeoutExceptionCounter;

@Around("@annotation(" + ANNOTATION_NAME + ")")
public Object interceptedKafkaMetricsEmissionAdvice(ProceedingJoinPoint proceedingJoinPoint)
throws Throwable {

try {
Object returnValue = proceedingJoinPoint.proceed();
customKafkaSuccessCounter.increment();
return returnValue;
} catch (Throwable e) {

if (e instanceof UnrecognizedTransactionPayloadException) {
invalidInboundTransactionMessageCounter.increment();
throw e;
}

if (e instanceof ContractException) {
} catch (UnrecognizedTransactionPayloadException e) {
invalidInboundTransactionMessageCounter.increment();
inboundTxnProcessingFailureCounter.increment();
throw e;
} catch (FabricTransactionException e) {
inboundTxnProcessingFailureCounter.increment();
if (e.getCode().equals(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_CONTRACT_ERROR)) {
inboundTxnContractExceptionCounter.increment();
throw e;
} else if (e.getCode().equals(ErrorCode.HYPERLEDGER_FABRIC_TRANSACTION_TIMEOUT_ERROR)) {
inboundTxnTimeoutExceptionCounter.increment();
}

throw e;
} catch (ServiceException e) {
inboundTxnProcessingFailureCounter.increment();
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ void publishChaincodeEvents(
String fabricTxId,
String eventName,
String channelName,
String messageKey);
String messageKey,
boolean isTxnValid);

/**
* @param payload String message payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ public ResponseEntity<ClientResponseModel> replayEvents(
if (Objects.isNull(transactionId)
|| chaincodeEvent.getTxId().equals(transactionId)) {
chaincodeEventListener.listener(
StringUtils.EMPTY, blockInfo, chaincodeEvent, networkName);
StringUtils.EMPTY,
blockInfo,
chaincodeEvent,
networkName,
info.isValid());
} else {
log.info(
"Event TransactionID {} does not match the provided TransactionID filter {}. Skipping event.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void publishChaincodeEvents(
String fabricTxId,
String eventName,
String channelName,
String messageKey) {
String messageKey,
boolean isTxnValid) {

Optional<FabricProperties.ChaincodeDetails> optionalChaincodeDetails =
fabricProperties.getEvents().getChaincodeDetails().stream()
Expand All @@ -54,12 +55,21 @@ public void publishChaincodeEvents(
fabricTxId,
eventName,
channelName,
messageKey);
messageKey,
isTxnValid);
return;
}

for (String topic : optionalChaincodeDetails.get().getListenerTopics()) {
sendMessage(topic, payload, chaincodeName, fabricTxId, eventName, channelName, messageKey);
sendMessage(
topic,
payload,
chaincodeName,
fabricTxId,
eventName,
channelName,
messageKey,
isTxnValid);
}
}

Expand All @@ -70,7 +80,8 @@ private void sendMessage(
String fabricTxId,
String eventName,
String channelName,
String messageKey) {
String messageKey,
boolean isValidTransaction) {
try {
ProducerRecord<Object, Object> producerRecord =
new ProducerRecord<>(topic, messageKey, payload);
Expand Down Expand Up @@ -98,6 +109,13 @@ private void sendMessage(
FabricClientConstants.FABRIC_EVENT_TYPE,
FabricClientConstants.FABRIC_EVENT_TYPE_CHAINCODE.getBytes()));

producerRecord
.headers()
.add(
new RecordHeader(
FabricClientConstants.FABRIC_TRANSACTION_IS_VALID,
Boolean.toString(isValidTransaction).getBytes()));

log.info("Publishing Chaincode event to outbound topic {}", topic);

ListenableFuture<SendResult<Object, Object>> future =
Expand Down Expand Up @@ -125,7 +143,7 @@ public void onFailure(Throwable ex) {
});

} catch (Exception ex) {
log.error("Error sending message - " + ex.getMessage());
log.error("Error sending message - {}", ex.getMessage());
}
}

Expand Down
Loading