Skip to content

Commit

Permalink
expose errors
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel committed Dec 12, 2024
1 parent 318bcc9 commit 25bf514
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 30 deletions.
57 changes: 29 additions & 28 deletions .dev/dev_arm64.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This is a compose file designed for arm64/Apple Silicon systems
# To adapt this to x86 please find and replace ".arm64" with empty
# To adapt this to x86 please find and replace "" with empty

# ARM64 supported images for kafka can be found here
# https://hub.docker.com/r/confluentinc/cp-kafka/tags?page=1&name=arm64
Expand All @@ -9,36 +9,37 @@ name: "kafbat-ui-dev"

services:

kafbat-ui:
container_name: kafbat-ui
image: ghcr.io/kafbat/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka0
- schema-registry0
- kafka-connect0
- ksqldb0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088
DYNAMIC_CONFIG_ENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
#kafbat-ui:
# container_name: kafbat-ui
# image: ghcr.io/kafbat/kafka-ui:latest
# ports:
# - 8080:8080
# depends_on:
# - kafka0
# - schema-registry0
# - kafka-connect0
# - ksqldb0
# environment:
# KAFKA_CLUSTERS_0_NAME: local
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
# KAFKA_CLUSTERS_0_METRICS_PORT: 9997
# KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085
# KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
# KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
# KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088
# DYNAMIC_CONFIG_ENABLED: 'true'
# KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
# KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.6.0
user: "0:0"
hostname: kafka0
container_name: kafka0
ports:
- 9092:9092
- 9997:9997
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
Expand All @@ -55,12 +56,12 @@ services:
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_JMX_PORT: 9997
# KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar
KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

schema-registry0:
image: confluentinc/cp-schema-registry:7.6.0.arm64
image: confluentinc/cp-schema-registry:7.6.0
ports:
- 8085:8085
depends_on:
Expand All @@ -76,7 +77,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-connect0:
image: confluentinc/cp-kafka-connect:7.6.0.arm64
image: confluentinc/cp-kafka-connect:7.6.0
ports:
- 8083:8083
depends_on:
Expand All @@ -101,7 +102,7 @@ services:
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors"

ksqldb0:
image: confluentinc/cp-ksqldb-server:7.6.0.arm64
image: confluentinc/cp-ksqldb-server:7.6.0
depends_on:
- kafka0
- kafka-connect0
Expand All @@ -119,7 +120,7 @@ services:
KSQL_CACHE_MAX_BYTES_BUFFERING: 0

kafka-init-topics:
image: confluentinc/cp-kafka:7.6.0.arm64
image: confluentinc/cp-kafka:7.6.0
volumes:
- ../documentation/compose/data/message.json:/data/message.json
depends_on:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -17,6 +19,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -56,10 +59,21 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {
return publisher.retryWhen(conflictCodeRetry());
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java#L35
private record ErrorMessage(@JsonProperty("message") String message) {
}

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.BadRequest.class, e -> {
final var errorMessage = e.getResponseBodyAs(ErrorMessage.class);

if (errorMessage != null && errorMessage.message() != null) {
return Mono.error(new ValidationException(errorMessage.message()));
}

return Mono.error(new ValidationException("Invalid configuration"));
})
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
}
Expand Down

0 comments on commit 25bf514

Please # to comment.