Skip to content

Commit

Permalink
[OPIK-457] Fix Redis lock keys leak (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora authored Nov 27, 2024
1 parent 67d2f57 commit 26aaa23
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 51 deletions.
1 change: 1 addition & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ health:

distributedLock:
lockTimeoutMS: ${DISTRIBUTED_LOCK_TIME_OUT:-500}
ttlInSeconds: ${DISTRIBUTED_LOCK_TTL_IN_SEC:-5}

redis:
singleNodeUrl: ${REDIS_URL:-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ public class DistributedLockConfig {

@Valid
@JsonProperty
@NotNull private int lockTimeoutMS;
@NotNull private int lockTimeoutMS; // lease time in milliseconds

/**
*
* @param ttlInSeconds
*
* This value has to be considerably higher than the lockTimeoutMS value, as it has to guarantee that the last
* thread to join the queue to acquire the lock will have enough time to execute the action. Then, the lock will be deleted from redis after the @ttlInSeconds.
* <br>
* This is needed as redisson by default doesn't delete the lock from redis after the lease time expires, it just releases the lock. The expiration time will be reset every time the lock is acquired.
*
* */
@Valid
@JsonProperty
@NotNull private int ttlInSeconds; // time to live in seconds

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.options.CommonOptions;
import org.redisson.client.RedisException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@RequiredArgsConstructor
@Slf4j
Expand All @@ -21,31 +24,54 @@ class RedissonLockService implements LockService {
private final @NonNull RedissonReactiveClient redisClient;
private final @NonNull DistributedLockConfig distributedLockConfig;

private record LockInstance(RPermitExpirableSemaphoreReactive semaphore, String locked) {

public void release() {
semaphore.release(locked)
.subscribe(
__ -> log.debug("Lock {} released successfully", locked),
__ -> log.warn("Lock {} already released", locked));
}

}

@Override
public <T> Mono<T> executeWithLock(@NonNull Lock lock, @NonNull Mono<T> action) {

RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS());
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);

return semaphore
.trySetPermits(1)
.then(Mono.defer(semaphore::acquire))
.flatMap(locked -> runAction(lock, action, locked)
return acquireLock(semaphore)
.flatMap(lockInstance -> runAction(lock, action, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
semaphore.release(locked).subscribe();
lockInstance.release();
log.debug("Lock {} released", lock);
}));
}

private RPermitExpirableSemaphoreReactive getSemaphore(Lock lock, int lockTimeoutMS) {
private RPermitExpirableSemaphoreReactive getSemaphore(Lock lock) {
return redisClient.getPermitExpirableSemaphore(
CommonOptions
.name(lock.key())
.timeout(Duration.ofMillis(lockTimeoutMS))
.timeout(Duration.ofMillis(distributedLockConfig.getLockTimeoutMS()))
.retryInterval(Duration.ofMillis(10))
.retryAttempts(lockTimeoutMS / 10));
.retryAttempts(distributedLockConfig.getLockTimeoutMS() / 10));
}

private Mono<LockInstance> acquireLock(RPermitExpirableSemaphoreReactive semaphore) {
return Mono.defer(() -> acquire(semaphore))
.retryWhen(Retry.max(3).filter(RedisException.class::isInstance));
}

private Mono<LockInstance> acquire(RPermitExpirableSemaphoreReactive semaphore) {
return semaphore
.setPermits(1)
.then(Mono.defer(
() -> semaphore.acquire(distributedLockConfig.getLockTimeoutMS(), TimeUnit.MILLISECONDS)))
.flatMap(locked -> semaphore.expire(Duration.ofSeconds(distributedLockConfig.getTtlInSeconds()))
.thenReturn(new LockInstance(semaphore, locked)));
}

private <T> Mono<T> runAction(Lock lock, Mono<T> action, String locked) {
Expand All @@ -59,15 +85,16 @@ private <T> Mono<T> runAction(Lock lock, Mono<T> action, String locked) {

@Override
public <T> Flux<T> executeWithLock(@NonNull Lock lock, @NonNull Flux<T> stream) {
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS());

return semaphore
.trySetPermits(1)
.then(Mono.defer(semaphore::acquire))
.flatMapMany(locked -> stream(lock, stream, locked)
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);

return acquireLock(semaphore)
.flatMapMany(lockInstance -> stream(lock, stream, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
semaphore.release(locked).subscribe();
lockInstance.release();
log.debug("Lock {} released", lock);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
@UtilityClass
public class TestDropwizardAppExtensionUtils {

public record CustomConfig(String key, String value) {
}

@Builder
public record AppContextConfig(
String jdbcUrl,
Expand All @@ -42,7 +45,8 @@ public record AppContextConfig(
String usageReportUrl,
String metadataVersion,
EventBus mockEventBus,
boolean corsEnabled) {
boolean corsEnabled,
List<CustomConfig> customConfigs) {
}

public static TestDropwizardAppExtension newTestDropwizardAppExtension(String jdbcUrl,
Expand Down Expand Up @@ -86,36 +90,36 @@ public static TestDropwizardAppExtension newTestDropwizardAppExtension(

public static TestDropwizardAppExtension newTestDropwizardAppExtension(AppContextConfig appContextConfig) {

var list = new ArrayList<String>();
list.add("database.url: " + appContextConfig.jdbcUrl());
var configs = new ArrayList<String>();
configs.add("database.url: " + appContextConfig.jdbcUrl());

if (appContextConfig.jdbcUserName() != null) {
list.add("database.user: " + appContextConfig.jdbcUserName());
configs.add("database.user: " + appContextConfig.jdbcUserName());
}

if (appContextConfig.jdbcDriverClass() != null) {
list.add("database.driverClass: " + appContextConfig.jdbcDriverClass());
configs.add("database.driverClass: " + appContextConfig.jdbcDriverClass());
}

if (appContextConfig.awsJdbcDriverPlugins() != null) {
list.add("database.properties.wrapperPlugins: " + appContextConfig.awsJdbcDriverPlugins());
configs.add("database.properties.wrapperPlugins: " + appContextConfig.awsJdbcDriverPlugins());
}

if (appContextConfig.databaseAnalyticsFactory() != null) {
list.add("databaseAnalytics.port: " + appContextConfig.databaseAnalyticsFactory().getPort());
list.add("databaseAnalytics.username: " + appContextConfig.databaseAnalyticsFactory().getUsername());
list.add("databaseAnalytics.password: " + appContextConfig.databaseAnalyticsFactory().getPassword());
configs.add("databaseAnalytics.port: " + appContextConfig.databaseAnalyticsFactory().getPort());
configs.add("databaseAnalytics.username: " + appContextConfig.databaseAnalyticsFactory().getUsername());
configs.add("databaseAnalytics.password: " + appContextConfig.databaseAnalyticsFactory().getPassword());
}

if (appContextConfig.runtimeInfo() != null) {
list.add("authentication.enabled: true");
list.add("authentication.sdk.url: "
configs.add("authentication.enabled: true");
configs.add("authentication.sdk.url: "
+ "%s/opik/auth".formatted(appContextConfig.runtimeInfo().getHttpsBaseUrl()));
list.add("authentication.ui.url: "
configs.add("authentication.ui.url: "
+ "%s/opik/auth-session".formatted(appContextConfig.runtimeInfo().getHttpsBaseUrl()));

if (appContextConfig.cacheTtlInSeconds() != null) {
list.add("authentication.apiKeyResolutionCacheTTLInSec: " + appContextConfig.cacheTtlInSeconds());
configs.add("authentication.apiKeyResolutionCacheTTLInSec: " + appContextConfig.cacheTtlInSeconds());
}
}

Expand Down Expand Up @@ -146,46 +150,56 @@ public void run(GuiceyEnvironment environment) {
};

if (appContextConfig.redisUrl() != null) {
list.add("redis.singleNodeUrl: %s".formatted(appContextConfig.redisUrl()));
list.add("redis.sentinelMode: false");
list.add("redis.lockTimeout: 500");
configs.add("redis.singleNodeUrl: %s".formatted(appContextConfig.redisUrl()));
configs.add("redis.sentinelMode: false");
configs.add("redis.lockTimeout: 500");
}

if (appContextConfig.rateLimitEnabled()) {
list.add("rateLimit.enabled: true");
list.add("rateLimit.generalLimit.limit: %d".formatted(appContextConfig.limit()));
list.add("rateLimit.generalLimit.durationInSeconds: %d"
configs.add("rateLimit.enabled: true");
configs.add("rateLimit.generalLimit.limit: %d".formatted(appContextConfig.limit()));
configs.add("rateLimit.generalLimit.durationInSeconds: %d"
.formatted(appContextConfig.limitDurationInSeconds()));

if (appContextConfig.customLimits() != null) {
appContextConfig.customLimits()
.forEach((bucket, limitConfig) -> {
list.add("rateLimit.customLimits.%s.limit: %d".formatted(bucket, limitConfig.limit()));
list.add("rateLimit.customLimits.%s.durationInSeconds: %d".formatted(bucket,
configs.add("rateLimit.customLimits.%s.limit: %d".formatted(bucket, limitConfig.limit()));
configs.add("rateLimit.customLimits.%s.durationInSeconds: %d".formatted(bucket,
limitConfig.durationInSeconds()));
});
}
}

if (appContextConfig.metadataVersion() != null) {
list.add("metadata.version: %s".formatted(appContextConfig.metadataVersion()));
configs.add("metadata.version: %s".formatted(appContextConfig.metadataVersion()));
}

if (appContextConfig.usageReportEnabled()) {
list.add("usageReport.enabled: %s".formatted(true));
configs.add("usageReport.enabled: %s".formatted(true));

if (appContextConfig.usageReportUrl() != null) {
list.add("usageReport.url: %s".formatted(appContextConfig.usageReportUrl()));
configs.add("usageReport.url: %s".formatted(appContextConfig.usageReportUrl()));
}
}

if (appContextConfig.corsEnabled) {
list.add("cors.enabled: true");
configs.add("cors.enabled: true");
}

if (CollectionUtils.isNotEmpty(appContextConfig.customConfigs())) {
appContextConfig
.customConfigs()
.stream()
.filter(customConfig -> configs.stream().noneMatch(s -> s.contains(customConfig.key())))
.forEach(customConfig -> {
configs.add("%s: %s".formatted(customConfig.key(), customConfig.value()));
});
}

return TestDropwizardAppExtension.forApp(OpikApplication.class)
.config("src/test/resources/config-test.yml")
.configOverrides(list.toArray(new String[0]))
.configOverrides(configs.toArray(new String[0]))
.randomPorts()
.hooks(hook)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,35 @@
import com.comet.opik.api.resources.utils.ClickHouseContainerUtils;
import com.comet.opik.api.resources.utils.MySQLContainerUtils;
import com.comet.opik.api.resources.utils.RedisContainerUtils;
import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils;
import com.comet.opik.infrastructure.lock.LockService;
import com.redis.testcontainers.RedisContainer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.redisson.api.RedissonReactiveClient;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.lifecycle.Startables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.AppContextConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.CustomConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Testcontainers(parallel = true)
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RedissonLockServiceIntegrationTest {

Expand All @@ -39,15 +45,18 @@ class RedissonLockServiceIntegrationTest {
private static final TestDropwizardAppExtension app;

static {
MYSQL.start();
CLICKHOUSE.start();
REDIS.start();

Startables.deepStart(REDIS, MYSQL, CLICKHOUSE).join();
var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory(CLICKHOUSE,
ClickHouseContainerUtils.DATABASE_NAME);

app = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension(MYSQL.getJdbcUrl(),
databaseAnalyticsFactory, null, REDIS.getRedisURI());
app = newTestDropwizardAppExtension(
AppContextConfig.builder()
.jdbcUrl(MYSQL.getJdbcUrl())
.databaseAnalyticsFactory(databaseAnalyticsFactory)
.redisUrl(REDIS.getRedisURI())
.customConfigs(List.of(new CustomConfig("distributedLock.lockTimeoutMS", "100"),
new CustomConfig("distributedLock.ttlInSeconds", "1")))
.build());
}

@Test
Expand Down Expand Up @@ -107,4 +116,31 @@ void testExecuteWithLock_AddIfAbsent_Flux(LockService lockService) {
assertTrue(sharedList.contains("C"));
}

@Test
void testExecuteWithLock_LockShouldHaveBeenEvicted(LockService lockService, RedissonReactiveClient redisClient) {
LockService.Lock lock = new LockService.Lock(UUID.randomUUID(), "test-lock");
List<String> sharedList = new ArrayList<>();

lockService.executeWithLock(lock, Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> {
sharedList.add("A");
return true;
}))).block();

Mono.delay(Duration.ofMillis(1500)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());

lockService.executeWithLock(lock, Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> {
sharedList.add("B");
return true;
}))).block();

assertTrue(sharedList.contains("A"));
assertTrue(sharedList.contains("B"));

Mono.delay(Duration.ofSeconds(1)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());
}

}
1 change: 1 addition & 0 deletions apps/opik-backend/src/test/resources/config-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ health:

distributedLock:
lockTimeout: 500
ttlInSeconds: 1

redis:
singleNodeUrl:
Expand Down

0 comments on commit 26aaa23

Please # to comment.