Skip to content

Commit

Permalink
Added tests for scavenging
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Jul 6, 2022
1 parent bf33239 commit 1d0248b
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;

import java.time.OffsetDateTime;
import java.util.List;

@Repository
public interface ResourceReservationRepository extends CrudRepository<ResourceReservation, String> {
@Query("select r from ResourceReservation r where r.lockedTill < ?1")
List<ResourceReservation> getTimedOut(OffsetDateTime dateTime);
@Query(value = "select * from resource_reservation r where r.locked_till < :dateTime AND r.status = 'Pending'", nativeQuery = true)
List<ResourceReservation> getTimedOut(@Param(value = "dateTime") OffsetDateTime dateTime);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.eventdriven.uniqueness.core.resourcereservation.jpa;

import io.eventdriven.uniqueness.core.resourcereservation.Hash;
import io.eventdriven.uniqueness.core.resourcereservation.ResourceReservationHandler;

import java.time.OffsetDateTime;
Expand All @@ -22,7 +21,8 @@ public void scavengeTimedOut(OffsetDateTime dateTime) {
var timedOutReservations = repository.getTimedOut(dateTime);

for (var reservation: timedOutReservations) {
resourceReservationHandler.release(Hash.hash(reservation.getResourceKey()).toString());
resourceReservationHandler.release(reservation.getResourceKey());
repository.delete(reservation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public class UserEmailRegistrationFallbackTests {
@Test
public void reservationHappyPath_confirmsReservation() throws InterruptedException {
// Given
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
Expand All @@ -51,6 +52,7 @@ public void reservationHappyPath_confirmsReservation() throws InterruptedExcepti
new ResourceReservationConfirmed(resourceKey, OffsetDateTime.now())
);

// When
SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
Expand All @@ -69,20 +71,24 @@ public void reservationHappyPath_confirmsReservation() throws InterruptedExcepti
})
);

// Then
var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
assertEquals(resourceKey, resourceReservation.getResourceKey());
assertEquals(ResourceReservation.Status.Confirmed, resourceReservation.getStatus());
assertNotNull(resourceReservation.getReservedAt());
}

@Test
public void reservationInitiated_storesPendingLookup() throws InterruptedException {
// Given
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
);

// When
SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
Expand All @@ -94,6 +100,7 @@ public void reservationInitiated_storesPendingLookup() throws InterruptedExcepti
})
);

// Then
var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
Expand All @@ -103,6 +110,7 @@ public void reservationInitiated_storesPendingLookup() throws InterruptedExcepti

@Test
public void emailReservationInitiatedAndUserRegistered_eventuallyConfirmsReservation() throws InterruptedException {
// Given
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
Expand All @@ -112,6 +120,7 @@ public void emailReservationInitiatedAndUserRegistered_eventuallyConfirmsReserva
new UserRegistered(userId, email, OffsetDateTime.now())
);

// When
SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
Expand All @@ -130,16 +139,19 @@ public void emailReservationInitiatedAndUserRegistered_eventuallyConfirmsReserva
})
);

// Then
var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNotNull(resourceReservation);
assertEquals(resourceKey, resourceReservation.getResourceKey());
assertEquals(ResourceReservation.Status.Confirmed, resourceReservation.getStatus());
assertNotNull(resourceReservation.getReservedAt());
}


@Test
public void releasedReservation_removesLookup() throws InterruptedException {
// Given
var reservationResult = (EventStore.AppendResult.Success) eventStore.append(
reservationStreamId,
new ResourceReservationInitiated(resourceKey, OffsetDateTime.now(), reservationLockDuration)
Expand All @@ -150,6 +162,7 @@ public void releasedReservation_removesLookup() throws InterruptedException {
new ResourceReservationReleaseInitiated(resourceKey, OffsetDateTime.now())
);

// When
SyncProcessor.runSync((ack) ->
subscribeToAll(eventStoreDBClient, (subscription, resolvedEvent) -> {
var position = resolvedEvent.getOriginalEvent().getPosition();
Expand All @@ -161,6 +174,7 @@ public void releasedReservation_removesLookup() throws InterruptedException {
})
);

// Then
var resourceReservation = repository.findById(resourceKey).orElse(null);

assertNull(resourceReservation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package io.eventdriven.uniqueness.users;

import com.eventstore.dbclient.EventStoreDBClient;
import com.eventstore.dbclient.EventStoreDBClientSettings;
import com.eventstore.dbclient.EventStoreDBConnectionString;
import com.eventstore.dbclient.ParseError;
import io.eventdriven.uniqueness.core.esdb.EventStore;
import io.eventdriven.uniqueness.core.resourcereservation.Hash;
import io.eventdriven.uniqueness.core.resourcereservation.esdb.ESDBResourceReservationHandler;
import io.eventdriven.uniqueness.core.resourcereservation.jpa.ResourceReservation;
import io.eventdriven.uniqueness.core.resourcereservation.jpa.ResourceReservationRepository;
import io.eventdriven.uniqueness.core.resourcereservation.jpa.ResourceReservationScavenging;
import io.eventdriven.uniqueness.core.retries.NulloRetryPolicy;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.AutoConfigureTestDatabase;
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.UUID;


import static io.eventdriven.uniqueness.core.resourcereservation.esdb.ResourceReservationEvent.*;
import static org.junit.jupiter.api.Assertions.*;

@DataJpaTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@RunWith(SpringRunner.class)
public class UserEmailRegistrationScavengingTests {

@Test
public void scavenging_cleansReservationAndLookup() {
// Given
for (var reservation : toScavenge) {
var resourceKey = reservation.getResourceKey();

repository.save(reservation);
eventStore.append(getReservationStreamId(resourceKey),
new ResourceReservationInitiated(resourceKey, reservation.getInitiatedAt(), Duration.ofMinutes(1))
);
}

for (var reservation : toKeep) {
var resourceKey = reservation.getResourceKey();

repository.save(reservation);

eventStore.append(getReservationStreamId(resourceKey),
new ResourceReservationInitiated(resourceKey, reservation.getInitiatedAt(), Duration.ofMinutes(1)),
new ResourceReservationConfirmed(resourceKey, reservation.getReservedAt())
);
}

// When
resourceReservationScavenging.scavengeTimedOut(OffsetDateTime.now());

// Then
for (var reservation : toScavenge) {
var resourceKey = reservation.getResourceKey();

assertFalse(repository.existsById(resourceKey));
assertInstanceOf(EventStore.ReadResult.StreamDoesNotExist.class, eventStore.read(getReservationStreamId(resourceKey)));
}

for (var reservation : toKeep) {
var resourceKey = reservation.getResourceKey();

assertTrue(repository.existsById(resourceKey));
var result = eventStore.read(getReservationStreamId(resourceKey));
assertInstanceOf(EventStore.ReadResult.Success.class, result);
assertEquals(2, ((EventStore.ReadResult.Success)result).events().length);
}
}
@Autowired
private ResourceReservationRepository repository;
private EventStore eventStore;

private ResourceReservationScavenging resourceReservationScavenging;

private String reservationStreamId;

private final ResourceReservation[] toScavenge = new ResourceReservation[]{
new ResourceReservation(
getRandomResourceKey(),
OffsetDateTime.now().minus(Duration.ofDays(1)),
ResourceReservation.Status.Pending,
OffsetDateTime.now().minus(Duration.ofDays(1)).minus(Duration.ofMinutes(10)),
null
),
new ResourceReservation(
getRandomResourceKey(),
OffsetDateTime.now().minus(Duration.ofMinutes(1)),
ResourceReservation.Status.Pending,
OffsetDateTime.now().minus(Duration.ofMinutes(1)).minus(Duration.ofSeconds(10)),
OffsetDateTime.now()
),
};

private final ResourceReservation[] toKeep = new ResourceReservation[]{
new ResourceReservation(
getRandomResourceKey(),
OffsetDateTime.now().minus(Duration.ofDays(2)),
ResourceReservation.Status.Confirmed,
OffsetDateTime.now().minus(Duration.ofDays(22)).minus(Duration.ofMinutes(10)),
null
),
new ResourceReservation(
getRandomResourceKey(),
OffsetDateTime.now().minus(Duration.ofMinutes(2)),
ResourceReservation.Status.Confirmed,
OffsetDateTime.now().minus(Duration.ofMinutes(2)).minus(Duration.ofSeconds(10)),
OffsetDateTime.now()
),
};

@BeforeEach
void beforeEach() throws ParseError {
EventStoreDBClientSettings settings = EventStoreDBConnectionString.parse("esdb://localhost:2113?tls=false");
EventStoreDBClient eventStoreDBClient = EventStoreDBClient.create(settings);
eventStore = new EventStore(eventStoreDBClient);
var resourceReservationHandler = new ESDBResourceReservationHandler(Duration.ofMinutes(10), new NulloRetryPolicy(), eventStore);

resourceReservationScavenging = new ResourceReservationScavenging(repository, resourceReservationHandler);
}

private String getRandomResourceKey() {
var email = "%s@email.com".formatted(UUID.randomUUID().toString().replace("-", ""));
return Hash.hash(email).toString();
}

private String getReservationStreamId(String resourceKey) {
return "reservation-%s".formatted(resourceKey);
}
}

0 comments on commit 1d0248b

Please # to comment.