From 5d5c8cc6886055aecd74ef968d109018558f7774 Mon Sep 17 00:00:00 2001 From: Paul Gallagher Date: Tue, 17 Dec 2024 17:31:34 +0000 Subject: [PATCH 1/3] [CORE-653] Lock around restore call to reduce risk of conflict. --- .../backup/servlets/RestoreServlet.java | 17 +++- .../io/telicent/backup/TestBackupData.java | 80 ++++++++++++++++++- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java b/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java index e9137bb..995e638 100644 --- a/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java +++ b/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java @@ -23,13 +23,16 @@ import jakarta.servlet.http.HttpServletResponse; import org.apache.jena.atlas.lib.DateTimeUtils; +import java.util.concurrent.locks.ReentrantLock; + import static io.telicent.backup.utils.BackupUtils.*; /** - * Servlet class responsible for the loading of given backup. + * Servlet class responsible for the loading of a given backup. */ public class RestoreServlet extends HttpServlet { private final DatasetBackupService backupService; + private final ReentrantLock lock = new ReentrantLock(); public RestoreServlet(DatasetBackupService backupService) { this.backupService = backupService; @@ -38,6 +41,15 @@ public RestoreServlet(DatasetBackupService backupService) { @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { ObjectNode resultNode = MAPPER.createObjectNode(); + + // Try to acquire the lock without blocking + if (!lock.tryLock()) { + response.setStatus(429); // HTTP 429: Too Many Requests (doesn't yet exist in HttpServletResponse). + resultNode.put("error", "Another restore operation is already in progress. Please try again later."); + processResponse(response, resultNode); + return; + } + try { String restoreId = request.getPathInfo(); resultNode.put("restore-id", restoreId); @@ -46,6 +58,9 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) processResponse(response, resultNode); } catch (Exception exception) { handleError(response, resultNode, exception); + } finally { + // Ensure the lock is released + lock.unlock(); } } } diff --git a/scg-system/src/test/java/io/telicent/backup/TestBackupData.java b/scg-system/src/test/java/io/telicent/backup/TestBackupData.java index 781fb8e..bbf6146 100644 --- a/scg-system/src/test/java/io/telicent/backup/TestBackupData.java +++ b/scg-system/src/test/java/io/telicent/backup/TestBackupData.java @@ -38,12 +38,14 @@ import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.*; import static io.telicent.TestJwtServletAuth.makeAuthGETCallWithPath; import static io.telicent.TestJwtServletAuth.makeAuthPOSTCallWithPath; import static org.apache.jena.graph.Graph.emptyGraph; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.Mockito.*; public class TestBackupData { @@ -51,6 +53,8 @@ public class TestBackupData { private FMod_BackupData testModule; + private static DatasetBackupService mockService = mock(DatasetBackupService.class); + @BeforeEach public void createAndSetupServerDetails() throws Exception { LibTestsSCG.setupAuthentication(); @@ -68,6 +72,7 @@ void clearDown() throws Exception { } LibTestsSCG.teardownAuthentication(); Configurator.reset(); + reset(mockService); } private FusekiModules generateModulesAndReplaceWithTestModule() { @@ -218,20 +223,77 @@ public void test_restoreBackup_error() { assertEquals(500, createBackupResponse.statusCode()); } + @Test + public void test_restoreBackup_rejectCallIfAlreadyUnderway() throws InterruptedException, ExecutionException { + // given + testModule = new FMod_BackupData_Mock(); + server = buildServer("--port=0", "--empty"); + + doAnswer(invocation -> { + Thread.sleep(50); // Simulate operation + return new ObjectMapper().createObjectNode().put("status", "success"); + }).when(mockService).restoreDatasets(anyString()); + + List>> futures = new ArrayList<>(); + // Use ExecutorService to run multiple threads + try (ExecutorService executorService = Executors.newFixedThreadPool(2)) { + + // Submit 5 concurrent tasks simulating individual requests + for (int i = 0; i < 2; i++) { + futures.add(executorService.submit(() -> { + try { + HttpResponse response = makeAuthPOSTCallWithPath(server, "$/backups/restore", "test"); + debug(response); + return response; + } catch (Exception e) { + return null; + } + })); + } + // Wait for threads to complete + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.SECONDS); + } + // then + // Analyze results + int successCount = 0; + int tooManyRequestsCount = 0; + for (Future> future : futures) { + HttpResponse result = future.get(); // Get the response + if (429 == result.statusCode()) { + tooManyRequestsCount++; + } else if (200 == result.statusCode()) { + successCount++; + } + } + // Verify that only one request succeeded + assertEquals(1, successCount, "Exactly one request should succeed"); + assertEquals(1, tooManyRequestsCount, "Exactly two requests should be rejected with 'Too Many Requests'"); + } + + /** * Debugging method for outputting response to std:out * @param response generated response */ private void debug(HttpResponse response) { + System.out.println(convertToJSON(response)); + } + + /** + * Obtain the JSON String from the HTTP Response + * @param response the response returned + * @return a JSON string + */ + private String convertToJSON(HttpResponse response) { try { InputStream inputStream = response.body(); InputStreamReader reader = new InputStreamReader(inputStream); ObjectMapper mapper = new ObjectMapper(); Object jsonObject = mapper.readValue(reader, Object.class); - String jsonString = mapper.writeValueAsString(jsonObject); - System.out.println(jsonString); + return mapper.writeValueAsString(jsonObject); }catch (IOException e) { - System.out.println(e.getMessage()); + return e.getMessage(); } } @@ -258,4 +320,16 @@ DatasetBackupService getBackupService(DataAccessPointRegistry dapRegistry) { return null; } } + + /** + * Extension of the Backup Module for testing purposes. + * Allows the underlying service to be mocked + */ + public static class FMod_BackupData_Mock extends FMod_BackupData { + + @Override + DatasetBackupService getBackupService(DataAccessPointRegistry dapRegistry) { + return mockService; + } + } } \ No newline at end of file From 95144004187d9ec8727660bd8d78144fc71920ba Mon Sep 17 00:00:00 2001 From: Paul Gallagher Date: Wed, 18 Dec 2024 11:45:43 +0000 Subject: [PATCH 2/3] [CORE-641] PR Feedback. Refactoring locking into Back Up Service and covering both backup/restore operations. --- .../backup/services/DatasetBackupService.java | 38 +++++++++++++ .../backup/servlets/BackupServlet.java | 15 +----- .../backup/servlets/RestoreServlet.java | 31 +---------- .../io/telicent/backup/TestBackupData.java | 53 +------------------ .../services/TestDatasetBackupService.java | 37 ++++++++++++- 5 files changed, 77 insertions(+), 97 deletions(-) diff --git a/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java b/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java index 3a12570..a310342 100644 --- a/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java +++ b/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java @@ -22,6 +22,8 @@ import io.telicent.jena.abac.labels.LabelsStore; import io.telicent.jena.abac.labels.LabelsStoreRocksDB; import io.telicent.jena.graphql.utils.ExcludeFromJacocoGeneratedReport; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import org.apache.jena.atlas.lib.DateTimeUtils; import org.apache.jena.fuseki.mgt.Backup; import org.apache.jena.fuseki.server.DataAccessPoint; @@ -37,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.zip.GZIPInputStream; import static io.telicent.backup.utils.BackupUtils.*; @@ -44,6 +47,8 @@ public class DatasetBackupService { + private final ReentrantLock lock; + private final DataAccessPointRegistry dapRegistry; final static ConcurrentHashMap> backupConsumerMap = new ConcurrentHashMap<>(); @@ -53,6 +58,39 @@ public DatasetBackupService(DataAccessPointRegistry dapRegistry) { this.dapRegistry = dapRegistry; registerMethods("tdb", this::backupTDB, this::restoreTDB); registerMethods("labels", this::backupLabelStore, this::restoreLabelStore); + lock = new ReentrantLock(); + } + + /** + * Ensures that only oine operation is processed at a time + * @param request incoming request + * @param response outgoing response + * @param backup flag indicating backup or restore + */ + public void process(HttpServletRequest request, HttpServletResponse response, boolean backup) { + // Try to acquire the lock without blocking + ObjectNode resultNode = MAPPER.createObjectNode(); + if (!lock.tryLock()) { + response.setStatus(HttpServletResponse.SC_CONFLICT); + resultNode.put("error", "Another conflicting operation is already in progress. Please try again later."); + processResponse(response, resultNode); + } else { + try { + String id = request.getPathInfo(); + resultNode.put("id", id); + resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss")); + if (backup) { + resultNode.set("backup", backupDataset(id)); + } else { + resultNode.set("restore", restoreDatasets(id)); + } + processResponse(response, resultNode); + } catch (Exception exception) { + handleError(response, resultNode, exception); + } finally { + lock.unlock(); + } + } } /** diff --git a/scg-system/src/main/java/io/telicent/backup/servlets/BackupServlet.java b/scg-system/src/main/java/io/telicent/backup/servlets/BackupServlet.java index 1b33617..a1bd6d3 100644 --- a/scg-system/src/main/java/io/telicent/backup/servlets/BackupServlet.java +++ b/scg-system/src/main/java/io/telicent/backup/servlets/BackupServlet.java @@ -16,14 +16,10 @@ package io.telicent.backup.servlets; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.telicent.backup.services.DatasetBackupService; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; -import org.apache.jena.atlas.lib.DateTimeUtils; - -import static io.telicent.backup.utils.BackupUtils.*; /** * Servlet class responsible for the creation of backups. @@ -37,15 +33,6 @@ public BackupServlet(DatasetBackupService backupService) { @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { - ObjectNode resultNode = MAPPER.createObjectNode(); - try { - String datasetName = request.getPathInfo(); - resultNode.put("dataset", datasetName); - resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss")); - resultNode.set("backup", backupService.backupDataset(datasetName)); - processResponse(response, resultNode); - } catch (Exception exception) { - handleError(response, resultNode, exception); - } + backupService.process(request, response, true); } } diff --git a/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java b/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java index 995e638..325c3d8 100644 --- a/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java +++ b/scg-system/src/main/java/io/telicent/backup/servlets/RestoreServlet.java @@ -16,51 +16,22 @@ package io.telicent.backup.servlets; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.telicent.backup.services.DatasetBackupService; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; -import org.apache.jena.atlas.lib.DateTimeUtils; - -import java.util.concurrent.locks.ReentrantLock; - -import static io.telicent.backup.utils.BackupUtils.*; /** * Servlet class responsible for the loading of a given backup. */ public class RestoreServlet extends HttpServlet { private final DatasetBackupService backupService; - private final ReentrantLock lock = new ReentrantLock(); - public RestoreServlet(DatasetBackupService backupService) { this.backupService = backupService; } @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) { - ObjectNode resultNode = MAPPER.createObjectNode(); - - // Try to acquire the lock without blocking - if (!lock.tryLock()) { - response.setStatus(429); // HTTP 429: Too Many Requests (doesn't yet exist in HttpServletResponse). - resultNode.put("error", "Another restore operation is already in progress. Please try again later."); - processResponse(response, resultNode); - return; - } - - try { - String restoreId = request.getPathInfo(); - resultNode.put("restore-id", restoreId); - resultNode.put("date", DateTimeUtils.nowAsString("yyyy-MM-dd_HH-mm-ss")); - resultNode.set("restore", backupService.restoreDatasets(restoreId)); - processResponse(response, resultNode); - } catch (Exception exception) { - handleError(response, resultNode, exception); - } finally { - // Ensure the lock is released - lock.unlock(); - } + backupService.process(request, response,false); } } diff --git a/scg-system/src/test/java/io/telicent/backup/TestBackupData.java b/scg-system/src/test/java/io/telicent/backup/TestBackupData.java index bbf6146..f46235d 100644 --- a/scg-system/src/test/java/io/telicent/backup/TestBackupData.java +++ b/scg-system/src/test/java/io/telicent/backup/TestBackupData.java @@ -38,14 +38,14 @@ import java.net.http.HttpResponse; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.*; import static io.telicent.TestJwtServletAuth.makeAuthGETCallWithPath; import static io.telicent.TestJwtServletAuth.makeAuthPOSTCallWithPath; import static org.apache.jena.graph.Graph.emptyGraph; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; public class TestBackupData { @@ -223,55 +223,6 @@ public void test_restoreBackup_error() { assertEquals(500, createBackupResponse.statusCode()); } - @Test - public void test_restoreBackup_rejectCallIfAlreadyUnderway() throws InterruptedException, ExecutionException { - // given - testModule = new FMod_BackupData_Mock(); - server = buildServer("--port=0", "--empty"); - - doAnswer(invocation -> { - Thread.sleep(50); // Simulate operation - return new ObjectMapper().createObjectNode().put("status", "success"); - }).when(mockService).restoreDatasets(anyString()); - - List>> futures = new ArrayList<>(); - // Use ExecutorService to run multiple threads - try (ExecutorService executorService = Executors.newFixedThreadPool(2)) { - - // Submit 5 concurrent tasks simulating individual requests - for (int i = 0; i < 2; i++) { - futures.add(executorService.submit(() -> { - try { - HttpResponse response = makeAuthPOSTCallWithPath(server, "$/backups/restore", "test"); - debug(response); - return response; - } catch (Exception e) { - return null; - } - })); - } - // Wait for threads to complete - executorService.shutdown(); - executorService.awaitTermination(1, TimeUnit.SECONDS); - } - // then - // Analyze results - int successCount = 0; - int tooManyRequestsCount = 0; - for (Future> future : futures) { - HttpResponse result = future.get(); // Get the response - if (429 == result.statusCode()) { - tooManyRequestsCount++; - } else if (200 == result.statusCode()) { - successCount++; - } - } - // Verify that only one request succeeded - assertEquals(1, successCount, "Exactly one request should succeed"); - assertEquals(1, tooManyRequestsCount, "Exactly two requests should be rejected with 'Too Many Requests'"); - } - - /** * Debugging method for outputting response to std:out * @param response generated response diff --git a/scg-system/src/test/java/io/telicent/backup/services/TestDatasetBackupService.java b/scg-system/src/test/java/io/telicent/backup/services/TestDatasetBackupService.java index 654ab73..d17d0f5 100644 --- a/scg-system/src/test/java/io/telicent/backup/services/TestDatasetBackupService.java +++ b/scg-system/src/test/java/io/telicent/backup/services/TestDatasetBackupService.java @@ -22,6 +22,9 @@ import io.telicent.jena.abac.ABAC; import io.telicent.jena.abac.core.DatasetGraphABAC; import io.telicent.jena.abac.labels.LabelsStoreRocksDB; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; import org.apache.jena.fuseki.server.DataAccessPoint; import org.apache.jena.fuseki.server.DataAccessPointRegistry; import org.apache.jena.fuseki.server.DataService; @@ -35,14 +38,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static io.telicent.backup.services.DatasetBackupService_Test.*; import static io.telicent.backup.utils.BackupUtils.MAPPER; import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class TestDatasetBackupService { @@ -1020,6 +1025,34 @@ public void test_restoreLabelStore_wrongPath() { assertTrue(RESULT_NODE.get("reason").asText().startsWith("Restore directory not found: ")); } + @Test + public void test_process_multipleCalls() throws InterruptedException, IOException { + HttpServletRequest request = mock(HttpServletRequest.class); + HttpServletResponse response = mock(HttpServletResponse.class); + ServletOutputStream outputStream = mock(ServletOutputStream.class); + when(response.getOutputStream()).thenReturn(outputStream); + doAnswer(invocation -> { + Thread.sleep(50); // Simulate operation + return "dataset"; + }).when(request).getPathInfo(); + + // Use ExecutorService to run multiple threads + try (ExecutorService executorService = Executors.newFixedThreadPool(2)) { + + // Submit 5 concurrent tasks simulating individual requests + for (int i = 0; i < 2; i++) { + final boolean flag = (i == 0); + executorService.submit(() -> { + cut.process(request, response, flag); + }); + } + // Wait for threads to complete + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.SECONDS); + } + verify(response, times(1)).setStatus(409); + } + public void doNothing(DataAccessPoint dataAccessPoint, String path, ObjectNode resultNode) { resultNode.put("success", true); } From 73e5885ab7d2aa06dd1e5040fc3c719d76522488 Mon Sep 17 00:00:00 2001 From: Paul Gallagher <132362215+TelicentPaul@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:47:57 +0000 Subject: [PATCH 3/3] Update scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java Co-authored-by: Rob Vesse --- .../java/io/telicent/backup/services/DatasetBackupService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java b/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java index a310342..218f009 100644 --- a/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java +++ b/scg-system/src/main/java/io/telicent/backup/services/DatasetBackupService.java @@ -62,7 +62,7 @@ public DatasetBackupService(DataAccessPointRegistry dapRegistry) { } /** - * Ensures that only oine operation is processed at a time + * Ensures that only one operation is processed at a time * @param request incoming request * @param response outgoing response * @param backup flag indicating backup or restore