Skip to content

Commit

Permalink
[OPIK-474] Fix Flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Nov 27, 2024
1 parent 26aaa23 commit 7496bff
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
@Slf4j
class RedissonLockService implements LockService {

private static final String LOCK_ACQUIRED = "Lock '{}' acquired";
private static final String LOCK_RELEASED = "Lock '{}' released";
private static final String TRYING_TO_LOCK_WITH = "Trying to lock with '{}'";

private final @NonNull RedissonReactiveClient redisClient;
private final @NonNull DistributedLockConfig distributedLockConfig;

Expand All @@ -29,8 +33,8 @@ private record LockInstance(RPermitExpirableSemaphoreReactive semaphore, String
public void release() {
semaphore.release(locked)
.subscribe(
__ -> log.debug("Lock {} released successfully", locked),
__ -> log.warn("Lock {} already released", locked));
__ -> log.debug("Lock '{}' released successfully", locked),
__ -> log.warn("Lock '{}' already released", locked));
}

}
Expand All @@ -40,14 +44,14 @@ public <T> Mono<T> executeWithLock(@NonNull Lock lock, @NonNull Mono<T> action)

RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);
log.debug(TRYING_TO_LOCK_WITH, lock);

return acquireLock(semaphore)
.flatMap(lockInstance -> runAction(lock, action, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
lockInstance.release();
log.debug("Lock {} released", lock);
log.debug(LOCK_RELEASED, lock);
}));
}

Expand Down Expand Up @@ -76,7 +80,7 @@ private Mono<LockInstance> acquire(RPermitExpirableSemaphoreReactive semaphore)

private <T> Mono<T> runAction(Lock lock, Mono<T> action, String locked) {
if (locked != null) {
log.debug("Lock {} acquired", lock);
log.debug(LOCK_ACQUIRED, lock);
return action;
}

Expand All @@ -88,20 +92,20 @@ public <T> Flux<T> executeWithLock(@NonNull Lock lock, @NonNull Flux<T> stream)

RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);
log.debug(TRYING_TO_LOCK_WITH, lock);

return acquireLock(semaphore)
.flatMapMany(lockInstance -> stream(lock, stream, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
lockInstance.release();
log.debug("Lock {} released", lock);
log.debug(LOCK_RELEASED, lock);
}));
}

private <T> Flux<T> stream(Lock lock, Flux<T> action, String locked) {
if (locked != null) {
log.debug("Lock {} acquired", lock);
log.debug(LOCK_ACQUIRED, lock);
return action;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,10 @@ private static int customComparator(BigDecimal v1, BigDecimal v2) {
return 0;
}

// If not equal within tolerance, perform standard comparison
return strippedV1.compareTo(strippedV2);
/*
* For p50, p90, p99, the calculation is not accurate, so we need to compare the integer part of the number
* */
return strippedV1.toBigInteger().compareTo(strippedV2.toBigInteger());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.AppContextConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.CustomConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Slf4j
Expand Down Expand Up @@ -128,7 +128,9 @@ void testExecuteWithLock_LockShouldHaveBeenEvicted(LockService lockService, Redi

Mono.delay(Duration.ofMillis(1500)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());
StepVerifier.create(redisClient.getBucket(lock.key()).isExists())
.assertNext(data -> assertThat(data).isFalse())
.verifyComplete();

lockService.executeWithLock(lock, Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> {
sharedList.add("B");
Expand All @@ -140,7 +142,9 @@ void testExecuteWithLock_LockShouldHaveBeenEvicted(LockService lockService, Redi

Mono.delay(Duration.ofSeconds(1)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());
StepVerifier.create(redisClient.getBucket(lock.key()).isExists())
.assertNext(data -> assertThat(data).isFalse())
.verifyComplete();
}

}

0 comments on commit 7496bff

Please # to comment.