Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

add maven formatter #136

Merged
merged 1 commit into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@SpringBootApplication
@Import({RootConfiguration.class})
public class Maestro {
public static void main(String[] args) {
SpringApplication.run(Maestro.class, args);
}
public static void main(String[] args) {
SpringApplication.run(Maestro.class, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
@ToString
@AllArgsConstructor
class IndexAnalysisMessage {
@NonNull
private String analysisId;
@NonNull
private String studyId;
@NonNull
private String repositoryCode;
@NonNull private String analysisId;
@NonNull private String studyId;
@NonNull private String repositoryCode;

/** if callers set this flag it will do a remove instead of add.*/
private Boolean removeAnalysis = false;
/** if callers set this flag it will do a remove instead of add. */
private Boolean removeAnalysis = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
@ToString
@AllArgsConstructor
class IndexMessage {
private String analysisId;
private String studyId;
@NonNull
private String repositoryCode;
/** if callers set this flag it will do a remove instead of add.*/
private Boolean removeAnalysis = false;
private String analysisId;
private String studyId;
@NonNull private String repositoryCode;
/** if callers set this flag it will do a remove instead of add. */
private Boolean removeAnalysis = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,46 @@

import bio.overture.maestro.domain.api.message.IndexResult;
import io.vavr.Tuple2;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Supplier;

@Slf4j
public class IndexMessagesHelper {
public static <T> void handleIndexRepository(Supplier<Mono<Tuple2<T, IndexResult>>> resultSupplier) {
val result = resultSupplier.get().blockOptional();
val tuple = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
public static <T> void handleIndexRepository(
Supplier<Mono<Tuple2<T, IndexResult>>> resultSupplier) {
val result = resultSupplier.get().blockOptional();
val tuple = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
}

public static <T> void handleIndexResult(Supplier<Flux<Tuple2<T, IndexResult>>> resultSupplier) {
/*
* Why Blocking?
*
* - this is a stream consumer, it's supposed to process one message at a time
* the value of reactive processing diminishes since the queue provides a buffering level,
* without blocking it will async process the messages and if one fails we can
* async add it to a DLQ in the subscriber, However, I opted to use blocking because of the next point.
*
* - spring reactive cloud stream is deprecated in favor of spring cloud functions that support
* stream processing: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.2.0.RELEASE/spring-cloud-stream.html#spring_cloud_function
* so I don't want to use a deprecated library, and if needed we can switch to cloud function in future
* https://stackoverflow.com/questions/53438208/spring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive
*/
val result = resultSupplier.get().collectList().blockOptional();
val tupleList = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
tupleList.forEach(tuple -> {
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
public static <T> void handleIndexResult(Supplier<Flux<Tuple2<T, IndexResult>>> resultSupplier) {
/*
* Why Blocking?
*
* - this is a stream consumer, it's supposed to process one message at a time
* the value of reactive processing diminishes since the queue provides a buffering level,
* without blocking it will async process the messages and if one fails we can
* async add it to a DLQ in the subscriber, However, I opted to use blocking because of the next point.
*
* - spring reactive cloud stream is deprecated in favor of spring cloud functions that support
* stream processing: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.2.0.RELEASE/spring-cloud-stream.html#spring_cloud_function
* so I don't want to use a deprecated library, and if needed we can switch to cloud function in future
* https://stackoverflow.com/questions/53438208/spring-cloud-stream-reactive-how-to-do-the-error-handling-in-case-of-reactive
*/
val result = resultSupplier.get().collectList().blockOptional();
val tupleList = result.orElseThrow(() -> new RuntimeException("failed to obtain result"));
tupleList.forEach(
tuple -> {
if (!tuple._2().isSuccessful()) {
log.error("failed to process message : {} successfully", tuple._1());
throw new RuntimeException("failed to process the message");
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,5 @@
@ToString
@AllArgsConstructor
class IndexRepositoryMessage {
@NonNull
private String repositoryCode;
@NonNull private String repositoryCode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
@ToString
@AllArgsConstructor
class IndexStudyMessage {
@NonNull
private String studyId;
@NonNull
private String repositoryCode;
@NonNull private String studyId;
@NonNull private String repositoryCode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package bio.overture.maestro.app.infra.adapter.inbound.messaging;

import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexRepository;
import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexResult;

import bio.overture.maestro.domain.api.Indexer;
import bio.overture.maestro.domain.api.exception.FailureData;
import bio.overture.maestro.domain.api.message.*;
Expand All @@ -32,111 +35,118 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexRepository;
import static bio.overture.maestro.app.infra.adapter.inbound.messaging.IndexMessagesHelper.handleIndexResult;

@Slf4j
@EnableBinding(Sink.class)
public class IndexingMessagesStreamListener {

private final Indexer indexer;

public IndexingMessagesStreamListener(@NonNull Indexer indexer) {
this.indexer = indexer;
}

@StreamListener(Sink.INPUT)
public void handleAnalysisMessage(@Payload IndexMessage indexMessage) {
if (isAnalysisReq(indexMessage)) {
val indexAnalysisMessage = new IndexAnalysisMessage(indexMessage.getAnalysisId(),
indexMessage.getStudyId(),
indexMessage.getRepositoryCode(),
indexMessage.getRemoveAnalysis());
handleIndexResult(() -> this.indexOrRemoveAnalysis(indexAnalysisMessage));
} else if (isStudyMsg(indexMessage)) {
val indexStudyMessage = new IndexStudyMessage(indexMessage.getStudyId(), indexMessage.getRepositoryCode());
handleIndexResult(() -> this.indexStudy(indexStudyMessage));
} else if (isRepoMsg(indexMessage)) {
val indexRepositoryMessage = new IndexRepositoryMessage(indexMessage.getRepositoryCode());
handleIndexRepository(() -> this.indexRepository(indexRepositoryMessage));
} else {
throw new IllegalArgumentException("invalid message format");
}
}

private boolean isAnalysisReq(IndexMessage indexMessage) {
return !StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isStudyMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isRepoMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
private final Indexer indexer;

public IndexingMessagesStreamListener(@NonNull Indexer indexer) {
this.indexer = indexer;
}

@StreamListener(Sink.INPUT)
public void handleAnalysisMessage(@Payload IndexMessage indexMessage) {
if (isAnalysisReq(indexMessage)) {
val indexAnalysisMessage =
new IndexAnalysisMessage(
indexMessage.getAnalysisId(),
indexMessage.getStudyId(),
indexMessage.getRepositoryCode(),
indexMessage.getRemoveAnalysis());
handleIndexResult(() -> this.indexOrRemoveAnalysis(indexAnalysisMessage));
} else if (isStudyMsg(indexMessage)) {
val indexStudyMessage =
new IndexStudyMessage(indexMessage.getStudyId(), indexMessage.getRepositoryCode());
handleIndexResult(() -> this.indexStudy(indexStudyMessage));
} else if (isRepoMsg(indexMessage)) {
val indexRepositoryMessage = new IndexRepositoryMessage(indexMessage.getRepositoryCode());
handleIndexRepository(() -> this.indexRepository(indexRepositoryMessage));
} else {
throw new IllegalArgumentException("invalid message format");
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexOrRemoveAnalysis(IndexAnalysisMessage msg) {
if (msg.getRemoveAnalysis()) {
return Flux.from(removeAnalysis(msg));
} else {
return indexAnalysis(msg);
}
}

private boolean isAnalysisReq(IndexMessage indexMessage) {
return !StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isStudyMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& !StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private boolean isRepoMsg(IndexMessage indexMessage) {
return StringUtils.isEmpty(indexMessage.getAnalysisId())
&& StringUtils.isEmpty(indexMessage.getStudyId())
&& !StringUtils.isEmpty(indexMessage.getRepositoryCode());
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexOrRemoveAnalysis(
IndexAnalysisMessage msg) {
if (msg.getRemoveAnalysis()) {
return Flux.from(removeAnalysis(msg));
} else {
return indexAnalysis(msg);
}

private Mono<Tuple2<IndexAnalysisMessage, IndexResult>> removeAnalysis(IndexAnalysisMessage msg) {
return indexer.removeAnalysis(RemoveAnalysisCommand.builder()
.analysisIdentifier(AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
}

private Mono<Tuple2<IndexAnalysisMessage, IndexResult>> removeAnalysis(IndexAnalysisMessage msg) {
return indexer
.removeAnalysis(
RemoveAnalysisCommand.builder()
.analysisIdentifier(
AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexAnalysis(IndexAnalysisMessage msg) {
return indexer
.indexAnalysis(
IndexAnalysisCommand.builder()
.analysisIdentifier(
AnalysisIdentifier.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexAnalysisMessage, IndexResult>> indexAnalysis(IndexAnalysisMessage msg) {
return indexer.indexAnalysis(IndexAnalysisCommand.builder()
.analysisIdentifier(AnalysisIdentifier.builder()
private Flux<Tuple2<IndexStudyMessage, IndexResult>> indexStudy(IndexStudyMessage msg) {
return indexer
.indexStudy(
IndexStudyCommand.builder()
.studyId(msg.getStudyId())
.analysisId(msg.getAnalysisId())
.repositoryCode(msg.getRepositoryCode())
.build()
).build())
.build())
.map(out -> new Tuple2<>(msg, out));
}

private Mono<Tuple2<IndexRepositoryMessage, IndexResult>> indexRepository(
IndexRepositoryMessage msg) {
return indexer
.indexRepository(
IndexStudyRepositoryCommand.builder().repositoryCode(msg.getRepositoryCode()).build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private Flux<Tuple2<IndexStudyMessage, IndexResult>> indexStudy(IndexStudyMessage msg) {
return indexer.indexStudy(IndexStudyCommand.builder()
.studyId(msg.getStudyId())
.repositoryCode(msg.getRepositoryCode())
.build())
.map(out -> new Tuple2<>(msg, out));
}

private Mono<Tuple2<IndexRepositoryMessage, IndexResult>> indexRepository(IndexRepositoryMessage msg) {
return indexer.indexRepository(IndexStudyRepositoryCommand.builder()
.repositoryCode(msg.getRepositoryCode())
.build())
.map(out -> new Tuple2<>(msg, out))
.onErrorResume((e) -> catchUnhandledErrors(msg, e));
}

private <T> Mono<Tuple2<T, IndexResult>> catchUnhandledErrors(T msg, Throwable e) {
log.error("failed processing message: {} ", msg, e);
val indexResult = IndexResult.builder()
.successful(false)
.failureData(FailureData.builder().build())
.build();
return Mono.just(new Tuple2<>(msg, indexResult));
}

}

private <T> Mono<Tuple2<T, IndexResult>> catchUnhandledErrors(T msg, Throwable e) {
log.error("failed processing message: {} ", msg, e);
val indexResult =
IndexResult.builder().successful(false).failureData(FailureData.builder().build()).build();
return Mono.just(new Tuple2<>(msg, indexResult));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import bio.overture.maestro.app.infra.adapter.inbound.messaging.song.SongAnalysisStreamListener;
import org.springframework.context.annotation.Import;


@Import({
IndexingMessagesStreamListener.class,
SongAnalysisStreamListener.class,
IndexingMessagesStreamListener.class,
SongAnalysisStreamListener.class,
})
public class MessagingConfig { }
public class MessagingConfig {}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@
@ToString
@AllArgsConstructor
class AnalysisMessage {
@NonNull
private final String analysisId;
@NonNull
private final String studyId;
@NonNull
private final String state;
@NonNull
private final String songServerId;
@NonNull private final String analysisId;
@NonNull private final String studyId;
@NonNull private final String state;
@NonNull private final String songServerId;
}
Loading