Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[CORE-653] Lock around restore call to reduce risk of conflict. #124

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.telicent.jena.abac.labels.LabelsStore;
import io.telicent.jena.abac.labels.LabelsStoreRocksDB;
import io.telicent.jena.graphql.utils.ExcludeFromJacocoGeneratedReport;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.jena.atlas.lib.DateTimeUtils;
import org.apache.jena.fuseki.mgt.Backup;
import org.apache.jena.fuseki.server.DataAccessPoint;
Expand All @@ -37,13 +39,16 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPInputStream;

import static io.telicent.backup.utils.BackupUtils.*;
import static org.apache.jena.riot.Lang.NQUADS;

public class DatasetBackupService {

private final ReentrantLock lock;

private final DataAccessPointRegistry dapRegistry;

final static ConcurrentHashMap<String, TriConsumer<DataAccessPoint, String, ObjectNode>> backupConsumerMap = new ConcurrentHashMap<>();
Expand All @@ -53,6 +58,39 @@ public DatasetBackupService(DataAccessPointRegistry dapRegistry) {
this.dapRegistry = dapRegistry;
registerMethods("tdb", this::backupTDB, this::restoreTDB);
registerMethods("labels", this::backupLabelStore, this::restoreLabelStore);
lock = new ReentrantLock();
}

/**
* Ensures that only one operation is processed at a time
* @param request incoming request
* @param response outgoing response
* @param backup flag indicating backup or restore
*/
public void process(HttpServletRequest request, HttpServletResponse response, boolean backup) {
// Try to acquire the lock without blocking
ObjectNode resultNode = MAPPER.createObjectNode();
if (!lock.tryLock()) {
response.setStatus(HttpServletResponse.SC_CONFLICT);
resultNode.put("error", "Another conflicting operation is already in progress. Please try again later.");
processResponse(response, resultNode);
} else {
try {
String id = request.getPathInfo();
resultNode.put("id", id);
resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss"));
if (backup) {
resultNode.set("backup", backupDataset(id));
} else {
resultNode.set("restore", restoreDatasets(id));
}
processResponse(response, resultNode);
} catch (Exception exception) {
handleError(response, resultNode, exception);
} finally {
lock.unlock();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package io.telicent.backup.servlets;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.telicent.backup.services.DatasetBackupService;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.jena.atlas.lib.DateTimeUtils;

import static io.telicent.backup.utils.BackupUtils.*;

/**
* Servlet class responsible for the creation of backups.
Expand All @@ -37,15 +33,6 @@ public BackupServlet(DatasetBackupService backupService) {

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
ObjectNode resultNode = MAPPER.createObjectNode();
try {
String datasetName = request.getPathInfo();
resultNode.put("dataset", datasetName);
resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss"));
resultNode.set("backup", backupService.backupDataset(datasetName));
processResponse(response, resultNode);
} catch (Exception exception) {
handleError(response, resultNode, exception);
}
backupService.process(request, response, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,22 @@

package io.telicent.backup.servlets;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.telicent.backup.services.DatasetBackupService;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.jena.atlas.lib.DateTimeUtils;

import static io.telicent.backup.utils.BackupUtils.*;

/**
* Servlet class responsible for the loading of given backup.
* Servlet class responsible for the loading of a given backup.
*/
public class RestoreServlet extends HttpServlet {
private final DatasetBackupService backupService;

public RestoreServlet(DatasetBackupService backupService) {
this.backupService = backupService;
}

@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
ObjectNode resultNode = MAPPER.createObjectNode();
try {
String restoreId = request.getPathInfo();
resultNode.put("restore-id", restoreId);
resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss"));
resultNode.set("restore", backupService.restoreDatasets(restoreId));
processResponse(response, resultNode);
} catch (Exception exception) {
handleError(response, resultNode, exception);
}
backupService.process(request, response,false);
}
}
31 changes: 28 additions & 3 deletions scg-system/src/test/java/io/telicent/backup/TestBackupData.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@
import static org.apache.jena.graph.Graph.emptyGraph;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;

public class TestBackupData {

private static FusekiServer server;

private FMod_BackupData testModule;

private static DatasetBackupService mockService = mock(DatasetBackupService.class);

@BeforeEach
public void createAndSetupServerDetails() throws Exception {
LibTestsSCG.setupAuthentication();
Expand All @@ -68,6 +72,7 @@ void clearDown() throws Exception {
}
LibTestsSCG.teardownAuthentication();
Configurator.reset();
reset(mockService);
}

private FusekiModules generateModulesAndReplaceWithTestModule() {
Expand Down Expand Up @@ -223,15 +228,23 @@ public void test_restoreBackup_error() {
* @param response generated response
*/
private void debug(HttpResponse<InputStream> response) {
System.out.println(convertToJSON(response));
}

/**
* Obtain the JSON String from the HTTP Response
* @param response the response returned
* @return a JSON string
*/
private String convertToJSON(HttpResponse<InputStream> response) {
try {
InputStream inputStream = response.body();
InputStreamReader reader = new InputStreamReader(inputStream);
ObjectMapper mapper = new ObjectMapper();
Object jsonObject = mapper.readValue(reader, Object.class);
String jsonString = mapper.writeValueAsString(jsonObject);
System.out.println(jsonString);
return mapper.writeValueAsString(jsonObject);
}catch (IOException e) {
System.out.println(e.getMessage());
return e.getMessage();
}
}

Expand All @@ -258,4 +271,16 @@ DatasetBackupService getBackupService(DataAccessPointRegistry dapRegistry) {
return null;
}
}

/**
* Extension of the Backup Module for testing purposes.
* Allows the underlying service to be mocked
*/
public static class FMod_BackupData_Mock extends FMod_BackupData {

@Override
DatasetBackupService getBackupService(DataAccessPointRegistry dapRegistry) {
return mockService;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import io.telicent.jena.abac.ABAC;
import io.telicent.jena.abac.core.DatasetGraphABAC;
import io.telicent.jena.abac.labels.LabelsStoreRocksDB;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.apache.jena.fuseki.server.DataAccessPoint;
import org.apache.jena.fuseki.server.DataAccessPointRegistry;
import org.apache.jena.fuseki.server.DataService;
Expand All @@ -35,14 +38,16 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static io.telicent.backup.services.DatasetBackupService_Test.*;
import static io.telicent.backup.utils.BackupUtils.MAPPER;
import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

public class TestDatasetBackupService {

Expand Down Expand Up @@ -1020,6 +1025,34 @@ public void test_restoreLabelStore_wrongPath() {
assertTrue(RESULT_NODE.get("reason").asText().startsWith("Restore directory not found: "));
}

@Test
public void test_process_multipleCalls() throws InterruptedException, IOException {
HttpServletRequest request = mock(HttpServletRequest.class);
HttpServletResponse response = mock(HttpServletResponse.class);
ServletOutputStream outputStream = mock(ServletOutputStream.class);
when(response.getOutputStream()).thenReturn(outputStream);
doAnswer(invocation -> {
Thread.sleep(50); // Simulate operation
return "dataset";
}).when(request).getPathInfo();

// Use ExecutorService to run multiple threads
try (ExecutorService executorService = Executors.newFixedThreadPool(2)) {

// Submit 5 concurrent tasks simulating individual requests
for (int i = 0; i < 2; i++) {
final boolean flag = (i == 0);
executorService.submit(() -> {
cut.process(request, response, flag);
});
}
// Wait for threads to complete
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
verify(response, times(1)).setStatus(409);
}

public void doNothing(DataAccessPoint dataAccessPoint, String path, ObjectNode resultNode) {
resultNode.put("success", true);
}
Expand Down
Loading