Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

#1882 fixed ResolvedPolicyCacheLoader loading policy imports transitively #1883

Merged
merged 2 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -81,14 +80,14 @@ final class EnforcementFlow {
Source.single(Entry.nonexistent());
private final Logger log = LoggerFactory.getLogger(getClass());
private final CachingSignalEnrichmentFacade thingsFacade;
private final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache;
private final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache;
private final Duration cacheRetryDelay;
private final SearchUpdateObserver searchUpdateObserver;
private final int maxArraySize;

private EnforcementFlow(final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache,
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache,
final AskWithRetryConfig askWithRetryConfig,
final StreamCacheConfig thingCacheConfig,
final Executor thingCacheDispatcher) {
Expand Down Expand Up @@ -127,11 +126,11 @@ public static EnforcementFlow of(final ActorSystem actorSystem,

final PolicyCacheLoader policyCacheLoader =
PolicyCacheLoader.getNewInstance(askWithRetryConfig, scheduler, policiesShardRegion);
final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture =
final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture =
new CompletableFuture<>();
final ResolvedPolicyCacheLoader resolvedPolicyCacheLoader =
new ResolvedPolicyCacheLoader(policyCacheLoader, cacheFuture);
final Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache =
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> policyEnforcerCache =
CacheFactory.createCache(resolvedPolicyCacheLoader, policyCacheConfig,
"things-search_enforcementflow_enforcer_cache_policy", policyCacheDispatcher);
cacheFuture.complete(policyEnforcerCache);
Expand Down Expand Up @@ -332,18 +331,17 @@ private Source<Entry<Pair<Policy, Set<PolicyTag>>>, NotUsed> readCachedEnforcer(

final Source<Entry<Pair<Policy, Set<PolicyTag>>>, ?> lazySource = Source.lazySource(() -> {
final CompletionStage<Source<Entry<Pair<Policy, Set<PolicyTag>>>, NotUsed>> enforcerFuture =
policyEnforcerCache.get(policyId)
policyEnforcerCache.get(new PolicyIdResolvingImports(policyId, true))
.thenApply(optionalEnforcerEntry -> {
if (shouldReloadCache(optionalEnforcerEntry.orElse(null), metadata, iteration)) {
// invalid entry; invalidate and retry after delay
policyEnforcerCache.invalidate(policyId);
policyEnforcerCache.invalidate(new PolicyIdResolvingImports(policyId, true));

// only invalidate causing policy tag once, e.g. when a massively imported policy is changed:
metadata.getCausingPolicyTag()
.filter(Predicate.not(tag -> policyId.equals(tag.getEntityId())))
.ifPresent(causingPolicyTag -> {
final boolean invalidated = policyEnforcerCache.invalidateConditionally(
causingPolicyTag.getEntityId(),
new PolicyIdResolvingImports(causingPolicyTag.getEntityId(), false),
entry -> entry.exists() &&
entry.getRevision() < causingPolicyTag.getRevision()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;

import org.eclipse.ditto.policies.model.PolicyId;

/**
* Package private cache key for loading policies via {@link ResolvedPolicyCacheLoader} into a policy cache.
*/
record PolicyIdResolvingImports(PolicyId policyId, boolean resolveImports) {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

import org.apache.pekko.japi.Pair;
Expand All @@ -29,59 +30,70 @@

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

final class ResolvedPolicyCacheLoader implements AsyncCacheLoader<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>> {
final class ResolvedPolicyCacheLoader
implements AsyncCacheLoader<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> {

private final PolicyCacheLoader policyCacheLoader;
private final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture;
private final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture;

ResolvedPolicyCacheLoader(final PolicyCacheLoader policyCacheLoader,
final CompletableFuture<Cache<PolicyId, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture) {
final CompletableFuture<Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>>> cacheFuture) {
this.policyCacheLoader = policyCacheLoader;
this.cacheFuture = cacheFuture;
}

@Override
public CompletableFuture<? extends Entry<Pair<Policy, Set<PolicyTag>>>> asyncLoad(final PolicyId policyId,
public CompletableFuture<? extends Entry<Pair<Policy, Set<PolicyTag>>>> asyncLoad(
final PolicyIdResolvingImports policyIdResolvingImports,
final Executor executor) {

return policyCacheLoader.asyncLoad(policyId, executor)
return policyCacheLoader.asyncLoad(policyIdResolvingImports.policyId(), executor)
.thenCompose(policyEntry -> {
if (policyEntry.exists()) {
final Policy policy = policyEntry.getValueOrThrow();
final long revision = policy.getRevision().map(PolicyRevision::toLong)
.orElseThrow(
() -> new IllegalStateException("Bad SudoRetrievePolicyResponse: no revision"));
final Set<PolicyTag> referencedPolicies = new HashSet<>();
return policy.withResolvedImports(
importedPolicyId -> cacheFuture
.thenCompose(cache -> cache.get(importedPolicyId))
.thenApply(entry -> entry.flatMap(Entry::get))
.thenApply(optionalReferencedPolicy -> {
if (optionalReferencedPolicy.isPresent()) {
final Policy referencedPolicy =
optionalReferencedPolicy.get().first();
final Optional<PolicyRevision> revision =
referencedPolicy.getRevision();
final Optional<PolicyId> entityId =
referencedPolicy.getEntityId();
if (revision.isPresent() && entityId.isPresent()) {
referencedPolicies.add(
PolicyTag.of(entityId.get(),
revision.get().toLong())
);
}
}
return optionalReferencedPolicy.map(Pair::first);
}))
.thenApply(resolvedPolicy -> {
final long revision = policy.getRevision().map(PolicyRevision::toLong)
.orElseThrow(
() -> new IllegalStateException(
"Bad SudoRetrievePolicyResponse: no revision"));
return Entry.of(revision, new Pair<>(resolvedPolicy, referencedPolicies));
});

if (policyIdResolvingImports.resolveImports()) {
return cacheFuture.thenCompose(cache ->
resolvePolicyImports(cache, policy, referencedPolicies)
)
.thenApply(resolvedPolicy ->
Entry.of(revision, new Pair<>(resolvedPolicy, referencedPolicies))
);
} else {
return CompletableFuture.completedFuture(
Entry.of(revision, new Pair<>(policy, referencedPolicies))
);
}
} else {
return CompletableFuture.completedFuture(Entry.nonexistent());
}
});
}

private static CompletionStage<Policy> resolvePolicyImports(
final Cache<PolicyIdResolvingImports, Entry<Pair<Policy, Set<PolicyTag>>>> cache,
final Policy policy,
final Set<PolicyTag> referencedPolicies) {

return policy.withResolvedImports(importedPolicyId ->
cache.get(new PolicyIdResolvingImports(importedPolicyId, false)) // don't transitively resolve imports, only 1 "level"
.thenApply(entry -> entry.flatMap(Entry::get))
.thenApply(optionalReferencedPolicy -> {
if (optionalReferencedPolicy.isPresent()) {
final Policy referencedPolicy = optionalReferencedPolicy.get().first();
final Optional<PolicyRevision> refRevision = referencedPolicy.getRevision();
final Optional<PolicyId> entityId = referencedPolicy.getEntityId();
if (refRevision.isPresent() && entityId.isPresent()) {
referencedPolicies.add(PolicyTag.of(entityId.get(), refRevision.get().toLong()));
}
}
return optionalReferencedPolicy.map(Pair::first);
})
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public void importedPolicyIsOnlyLoadedOnceWhenTwoDifferentPoliciesImportFromItAn
final long policy2Rev1 = 3L;
final ThingId thingId1 = ThingId.of("thing:id-1");
final ThingId thingId2 = ThingId.of("thing:id-2");
final PolicyId motherPolicyId = PolicyId.of("policy:mother"); // mother should never be loaded
final PolicyId importedPolicyId = PolicyId.of("policy:imported");
final PolicyId importingPolicy1Id = PolicyId.of("policy:importing-1");
final PolicyId importingPolicy2Id = PolicyId.of("policy:importing-2");
Expand Down Expand Up @@ -264,9 +265,13 @@ public void importedPolicyIsOnlyLoadedOnceWhenTwoDifferentPoliciesImportFromItAn

final SudoRetrievePolicy sudoRetrieveImportedPolicy = policiesProbe.expectMsgClass(SudoRetrievePolicy.class);
assertThat(sudoRetrieveImportedPolicy.getEntityId().toString()).isEqualTo(importedPolicyId.toString());
final var importedPolicy = Policy.newBuilder(importedPolicyId).setRevision(importedPolicyRev1).build();
final var importedPolicy = Policy.newBuilder(importedPolicyId)
.setPolicyImport(PolicyImport.newInstance(motherPolicyId, null))
.setRevision(importedPolicyRev1).build();
policiesProbe.reply(SudoRetrievePolicyResponse.of(importedPolicyId, importedPolicy, DittoHeaders.empty()));

policiesProbe.expectNoMessage();

final AbstractWriteModel writeModel1 = sinkProbe.expectNext().get(0);
assertThat(writeModel1).isInstanceOf(ThingWriteModel.class);
final var document1 = JsonObject.of(((ThingWriteModel) writeModel1).getThingDocument().toJson());
Expand Down Expand Up @@ -333,7 +338,9 @@ public void importedPolicyIsOnlyLoadedOnceWhenTwoDifferentPoliciesImportFromItAn

final SudoRetrievePolicy sudoRetrieveImportedPolicy_2 = policiesProbe.expectMsgClass(SudoRetrievePolicy.class);
assertThat(sudoRetrieveImportedPolicy_2.getEntityId().toString()).isEqualTo(importedPolicyId.toString());
final var importedPolicy_2 = Policy.newBuilder(importedPolicyId).setRevision(importedPolicyRev2).build();
final var importedPolicy_2 = Policy.newBuilder(importedPolicyId)
.setPolicyImport(PolicyImport.newInstance(motherPolicyId, null))
.setRevision(importedPolicyRev2).build();
policiesProbe.reply(SudoRetrievePolicyResponse.of(importedPolicyId, importedPolicy_2, DittoHeaders.empty()));

// THEN: write model contains up-to-date policy revisions.
Expand Down
Loading