Skip to content

Commit

Permalink
Added setup for fallback tests for Uniqueness lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Jul 6, 2022
1 parent 62a0888 commit 355c3a8
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 14 deletions.
1 change: 1 addition & 0 deletions samples/uniqueness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.8.2'
testImplementation 'org.junit.platform:junit-platform-launcher:1.8.2'
testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.0'
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.eventdriven.uniqueness;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public AppendResult append(String streamId, Object... events) {
eventsToAppend.iterator()
).get();

return new AppendResult.Success(result.getNextExpectedRevision());
return new AppendResult.Success(result.getNextExpectedRevision(), result.getLogPosition());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof WrongExpectedVersionException wrongExpectedVersionException) {
return new AppendResult.StreamAlreadyExists(wrongExpectedVersionException.getActualVersion());
Expand All @@ -55,7 +55,7 @@ public AppendResult append(String streamId, StreamRevision expectedRevision, Obj
eventsToAppend.iterator()
).get();

return new AppendResult.Success(result.getNextExpectedRevision());
return new AppendResult.Success(result.getNextExpectedRevision(), result.getLogPosition());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof WrongExpectedVersionException wrongExpectedVersionException) {
return new AppendResult.Conflict(expectedRevision, wrongExpectedVersionException.getActualVersion());
Expand Down Expand Up @@ -109,7 +109,7 @@ public AppendResult setStreamMaxAge(String streamId, Duration duration) {
metadata
).get();

return new AppendResult.Success(result.getNextExpectedRevision());
return new AppendResult.Success(result.getNextExpectedRevision(), result.getLogPosition());
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof WrongExpectedVersionException wrongExpectedVersionException) {
return new AppendResult.StreamAlreadyExists(wrongExpectedVersionException.getActualVersion());
Expand Down Expand Up @@ -147,7 +147,7 @@ default Boolean succeeded() {

sealed public interface AppendResult {
record Success(
StreamRevision nextExpectedRevision) implements AppendResult {
StreamRevision nextExpectedRevision, Position logPosition) implements AppendResult {
}

record StreamAlreadyExists(StreamRevision actual) implements AppendResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@
import java.util.function.BiConsumer;

public final class ESDBSubscription {
public static void subscribeToAll(
public static Subscription subscribeToAll(
EventStoreDBClient eventStore,
BiConsumer<Subscription, ResolvedEvent> handle
) {
return subscribeToAll(eventStore, SubscribeToAllOptions.get(), handle);
}

public static Subscription subscribeToAll(
EventStoreDBClient eventStore,
SubscribeToAllOptions options,
BiConsumer<Subscription, ResolvedEvent> handle
Expand All @@ -15,7 +22,7 @@ public static void subscribeToAll(
// Note this is a pretty naive version of subscription handling.
// It doesn't have error handling, retries and resubscribes.
// For the full solution, check main samples.
eventStore.subscribeToAll(new SubscriptionListener() {
return eventStore.subscribeToAll(new SubscriptionListener() {
@Override
public void onEvent(Subscription subscription, ResolvedEvent resolvedEvent) {
handle.accept(subscription, resolvedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import com.fasterxml.jackson.annotation.JsonFormat;

import javax.persistence.Column;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.*;
import java.time.OffsetDateTime;

@Entity
public class ResourceReservation {
public enum Status {
Pending,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public UserEmailReservationEventHandler(ResourceReservationRepository repository

public void handle(Object event) {
switch (event) {
case UserRegistered initiated -> handle(initiated);
case UserRegistered registered -> handle(registered);
case UserEmailChanged emailChanged -> handle(emailChanged);
default -> {
}
Expand All @@ -26,6 +26,7 @@ private void handle(UserRegistered userRegistered) {
.ifPresent(resource -> {
if (!resource.isConfirmed()) {
resource.confirm(userRegistered.registeredAt());
repository.save(resource);
}
});
}
Expand All @@ -35,6 +36,7 @@ private void handle(UserEmailChanged emailChanged) {
.ifPresent(resource -> {
if (!resource.isConfirmed()) {
resource.confirm(emailChanged.changedAt());
repository.save(resource);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,174 @@
import com.eventstore.dbclient.EventStoreDBClientSettings;
import com.eventstore.dbclient.EventStoreDBConnectionString;
import com.eventstore.dbclient.ParseError;
import io.eventdriven.uniqueness.core.async.SyncProcessor;
import io.eventdriven.uniqueness.core.esdb.EventStore;
import io.eventdriven.uniqueness.core.resourcereservation.Hash;
import io.eventdriven.uniqueness.core.resourcereservation.ResourceReservationEventHandler;
import io.eventdriven.uniqueness.core.resourcereservation.esdb.ESDBResourceReservationHandler;
import io.eventdriven.uniqueness.core.resourcereservation.jpa.ResourceReservation;
import io.eventdriven.uniqueness.core.resourcereservation.jpa.ResourceReservationRepository;
import io.eventdriven.uniqueness.core.retries.NulloRetryPolicy;
import io.eventdriven.uniqueness.users.reservation.UserEmailReservationEventHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.UUID;

import static io.eventdriven.uniqueness.core.esdb.subscriptions.ESDBSubscription.subscribeToAll;
import static io.eventdriven.uniqueness.core.resourcereservation.esdb.ResourceReservationEvent.ResourceReservationConfirmed;
import static io.eventdriven.uniqueness.core.resourcereservation.esdb.ResourceReservationEvent.ResourceReservationInitiated;
import static io.eventdriven.uniqueness.core.serialization.EventSerializer.deserialize;
import static io.eventdriven.uniqueness.users.UserEvent.UserRegistered;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@RunWith(SpringRunner.class)
public class UserEmailRegistrationFallbackTests {
private EventStoreDBClient eventStore;
@Test
public void reservationHappyPath_confirmsReservation() throws InterruptedException {
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
);
var userRegistrationResult = (EventStore.AppendResult.Success) eventStore.append(
userStreamId,
new UserRegistered(userId, email, OffsetDateTime.now())
);
var reservationConfirmedResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
reservationResult.nextExpectedRevision(),
new ResourceReservationConfirmed(resourceKey, OffsetDateTime.now())
);

SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
if (position.equals(reservationResult.logPosition())) {
var event = deserialize(resolvedEvent).orElseThrow();

reservationEventHandler.handle(event);
}
if (position.equals(userRegistrationResult.logPosition()) || position.equals(reservationConfirmedResult.logPosition())) {
var event = deserialize(resolvedEvent).orElseThrow();

emailReservationEventHandler.handle(event);
ack.accept(true);
subscription.stop();
}
})
);

var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
assertEquals(resourceKey, resourceReservation.getResourceKey());
assertEquals(ResourceReservation.Status.Confirmed, resourceReservation.getStatus());
}

@Test
public void reservationInitiated_storesPendingLookup() throws InterruptedException {
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
);

SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
if (position.equals(reservationResult.logPosition())) {
var event = deserialize(resolvedEvent).orElseThrow();

reservationEventHandler.handle(event);
}
})
);

var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
assertEquals(resourceKey, resourceReservation.getResourceKey());
assertEquals(ResourceReservation.Status.Pending, resourceReservation.getStatus());
}

@Test
public void emailReservationInitiatedAndUserRegistered_eventuallyConfirmsReservation() throws InterruptedException {
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
);
var userRegistrationResult = (EventStore.AppendResult.Success) eventStore.append(
userStreamId,
new UserRegistered(userId, email, OffsetDateTime.now())
);

SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
if (position.equals(reservationResult.logPosition())) {
var event = deserialize(resolvedEvent).orElseThrow();

reservationEventHandler.handle(event);
}
if (position.equals(userRegistrationResult.logPosition())) {
var event = deserialize(resolvedEvent).orElseThrow();

emailReservationEventHandler.handle(event);
ack.accept(true);
subscription.stop();
}
})
);

var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
assertEquals(resourceKey, resourceReservation.getResourceKey());
assertEquals(ResourceReservation.Status.Confirmed, resourceReservation.getStatus());
}

@PersistenceContext
private EntityManager entityManager;
@Autowired
private ResourceReservationRepository repository;
private EventStoreDBClient eventStoreDBClient;
private EventStore eventStore;
private ResourceReservationEventHandler reservationEventHandler;
private UserEmailReservationEventHandler emailReservationEventHandler;

private final Duration reservationLockDuration = Duration.ofMinutes(15);
private UUID userId;
private String email;
private String resourceKey;
private String userStreamId;
private String reservationStreamId;

@BeforeEach
void beforeEach() throws ParseError {
EventStoreDBClientSettings settings = EventStoreDBConnectionString.parse("esdb://localhost:2113?tls=false");
this.eventStore = EventStoreDBClient.create(settings);
eventStoreDBClient = EventStoreDBClient.create(settings);
eventStore = new EventStore(eventStoreDBClient);
var resourceReservationCommandHandler = new ESDBResourceReservationHandler(reservationLockDuration, new NulloRetryPolicy(), eventStore);

reservationEventHandler = new ResourceReservationEventHandler(repository, resourceReservationCommandHandler);
emailReservationEventHandler = new UserEmailReservationEventHandler(repository);

userId = UUID.randomUUID();
email = "%s@email.com".formatted(UUID.randomUUID().toString().replace("-", ""));
userStreamId = "user-%s".formatted(userId);

resourceKey = Hash.hash(email).toString();
reservationStreamId = "reservation-%s".formatted(resourceKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void forNotUsedEmail_ReservationSucceeds(){

}

private Duration reservationLockDuration = Duration.ofMinutes(15);
private final Duration reservationLockDuration = Duration.ofMinutes(15);
private EventStore eventStore;
private UserCommandHandler userCommandHandler;

Expand Down

0 comments on commit 355c3a8

Please # to comment.