Skip to content

Commit

Permalink
Gh-3350 Fed store fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
p29876 committed Jan 27, 2025
1 parent 9a0de1b commit bd362db
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Crown Copyright
* Copyright 2020-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,16 @@

package uk.gov.gchq.gaffer.access.predicate.user;

import java.io.Serializable;

import uk.gov.gchq.gaffer.user.User;
import uk.gov.gchq.koryphe.Since;
import uk.gov.gchq.koryphe.Summary;
import uk.gov.gchq.koryphe.predicate.KoryphePredicate;

@Since("1.13.1")
@Summary("Predicate which never allows user access")
public class NoAccessUserPredicate extends KoryphePredicate<User> {
public class NoAccessUserPredicate extends KoryphePredicate<User> implements Serializable {
@Override
public boolean test(final User user) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Crown Copyright
* Copyright 2020-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,16 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.gov.gchq.gaffer.access.predicate.user;

import java.io.Serializable;

import uk.gov.gchq.gaffer.user.User;
import uk.gov.gchq.koryphe.Since;
import uk.gov.gchq.koryphe.Summary;
import uk.gov.gchq.koryphe.predicate.KoryphePredicate;

@Since("1.13.1")
@Summary("A predicate which always allows a user access")
public class UnrestrictedAccessUserPredicate extends KoryphePredicate<User> {
public class UnrestrictedAccessUserPredicate extends KoryphePredicate<User> implements Serializable {
@Override
public boolean test(final User user) {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,10 +91,9 @@ public static OperationChain getValidOperationForGraph(final Operation operation
&& ((OperationView) operation).getView().hasGroups()) {

// Update the view for the graph
((OperationView) operation).setView(
getValidViewForGraph(((OperationView) operation).getView(), graphSerialisable));

updatedOperations.add(operation);
OperationView fixedOp = (OperationView) operation.shallowClone();
fixedOp.setView(getValidViewForGraph(fixedOp.getView(), graphSerialisable));
updatedOperations.add((Operation) fixedOp);

// Recursively go into operation chains to make sure everything is fixed
} else if (operation instanceof OperationChain) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,12 +32,10 @@
import uk.gov.gchq.gaffer.store.Store;
import uk.gov.gchq.gaffer.store.operation.handler.OperationHandler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Main default handler for federated operations. Handles delegation to selected
Expand Down Expand Up @@ -97,10 +95,15 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_FIX_OP_LIMIT = "federated.fixOperationLimit";

/**
* Default depth limit for fixing an operation chain.
*/
public static final int DFLT_FIX_OP_LIMIT = 5;

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
LOGGER.debug("Running operation: {}", operation);
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, "5"));
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));

// If the operation has output wrap and return using sub class handler
if (operation instanceof Output) {
Expand Down Expand Up @@ -153,40 +156,43 @@ public Object doOperation(final P operation, final Context context, final Store
*/
protected List<GraphSerialisable> getGraphsToExecuteOn(final Operation operation, final Context context,
final FederatedStore store) throws OperationException {
// Use default graph IDs as fallback
List<String> graphIds = store.getDefaultGraphIds();
List<GraphSerialisable> graphsToExecute = new LinkedList<>();
// If user specified graph IDs for this chain parse as comma separated list
if (operation.containsOption(OPT_GRAPH_IDS)) {
graphIds = Arrays.asList(operation.getOption(OPT_GRAPH_IDS).split(","));
} else if (operation.containsOption(OPT_SHORT_GRAPH_IDS)) {
graphIds = Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(","));
}
List<String> specifiedGraphIds = new ArrayList<>();
List<GraphSerialisable> graphsToExecute = new ArrayList<>();

// If user specified graph IDs for this chain parse as comma separated list
if (operation.containsOption(OPT_SHORT_GRAPH_IDS)) {
specifiedGraphIds.addAll(Arrays.asList(operation.getOption(OPT_SHORT_GRAPH_IDS).split(",")));
// Check legacy option
} else if (operation.containsOption(OPT_GRAPH_IDS)) {
specifiedGraphIds.addAll(Arrays.asList(operation.getOption(OPT_GRAPH_IDS).split(",")));
// If a user has specified to just exclude some graphs then run all but them
if (operation.containsOption(OPT_EXCLUDE_GRAPH_IDS)) {
// Get all the IDs
graphIds = StreamSupport.stream(store.getAllGraphsAndAccess().spliterator(), false)
.map(Pair::getLeft)
.map(GraphSerialisable::getGraphId)
.collect(Collectors.toList());

} else if (operation.containsOption(OPT_EXCLUDE_GRAPH_IDS)) {
store.getAllGraphsAndAccess().forEach(pair -> specifiedGraphIds.add(pair.getLeft().getGraphId()));
// Exclude the ones the user has specified
Arrays.asList(operation.getOption(OPT_EXCLUDE_GRAPH_IDS).split(",")).forEach(graphIds::remove);
Arrays.asList(operation.getOption(OPT_EXCLUDE_GRAPH_IDS).split(",")).forEach(specifiedGraphIds::remove);
}

// Use default graph IDs as a fallback
if (specifiedGraphIds.isEmpty()) {
specifiedGraphIds.addAll(store.getDefaultGraphIds());
}

try {
// Get the corresponding graph serialisable
for (final String id : graphIds) {
LOGGER.debug("Will execute on Graph: {}", id);
// Get the corresponding graph serialisables
for (final String id : specifiedGraphIds) {
try {
Pair<GraphSerialisable, GraphAccess> pair = store.getGraphAccessPair(id);

// Check the user has access to the graph
if (pair.getRight().hasReadAccess(context.getUser(), store.getProperties().getAdminAuth())) {
LOGGER.debug("User has access, will execute on Graph: '{}'", id);
// Create a new graph object from the serialised info
graphsToExecute.add(pair.getLeft());
} else {
LOGGER.warn("User does not have access, to Graph: '{}' it will be skipped", id);
}
} catch (final CacheOperationException e) {
throw new OperationException("Failed to get Graph from cache: '" + id + "'", e);
}
} catch (final CacheOperationException e) {
throw new OperationException("Failed to get Graphs from cache", e);
}

// Keep graphs sorted so results returned are predictable between runs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Crown Copyright
* Copyright 2024-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,9 @@
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.federated.simple.FederatedUtils;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.operation.OperationChain;
import uk.gov.gchq.gaffer.operation.OperationException;
import uk.gov.gchq.gaffer.operation.io.Output;
import uk.gov.gchq.gaffer.store.Context;
Expand All @@ -36,9 +38,10 @@
*/
public class SeparateOutputHandler<P extends Output<O>, O> extends FederatedOperationHandler<P> {
private static final Logger LOGGER = LoggerFactory.getLogger(SeparateOutputHandler.class);

@Override
public Map<String, O> doOperation(final P operation, final Context context, final Store store) throws OperationException {
final int fixLimit = Integer.parseInt(operation.getOption(OPT_FIX_OP_LIMIT, String.valueOf(DFLT_FIX_OP_LIMIT)));
List<GraphSerialisable> graphsToExecute = this.getGraphsToExecuteOn(operation, context, (FederatedStore) store);

if (graphsToExecute.isEmpty()) {
Expand All @@ -50,7 +53,8 @@ public Map<String, O> doOperation(final P operation, final Context context, fina
Map<String, O> results = new HashMap<>();
for (final GraphSerialisable gs : graphsToExecute) {
try {
results.put(gs.getGraphId(), gs.getGraph().execute(operation, context.getUser()));
OperationChain<O> fixedChain = FederatedUtils.getValidOperationForGraph(operation, gs, 0, fixLimit);
results.put(gs.getGraphId(), gs.getGraph().execute(fixedChain, context.getUser()));
} catch (final OperationException | UnsupportedOperationException e) {
// Optionally skip this error if user has specified to do so
LOGGER.error("Operation failed on graph: {}", gs.getGraphId());
Expand Down

0 comments on commit bd362db

Please # to comment.