Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix: ensure custom endpoint monitor obeys refresh rate #1175

Merged
merged 6 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import software.amazon.jdbc.PluginService;
import software.amazon.jdbc.util.CacheMap;
import software.amazon.jdbc.util.Messages;
import software.amazon.jdbc.util.RdsUtils;
import software.amazon.jdbc.util.StringUtils;
import software.amazon.jdbc.util.telemetry.TelemetryCounter;
import software.amazon.jdbc.util.telemetry.TelemetryFactory;
Expand All @@ -55,7 +54,6 @@ public class CustomEndpointMonitorImpl implements CustomEndpointMonitor {
protected static final long CUSTOM_ENDPOINT_INFO_EXPIRATION_NANO = TimeUnit.MINUTES.toNanos(5);

protected final AtomicBoolean stop = new AtomicBoolean(false);
protected final RdsUtils rdsUtils = new RdsUtils();
protected final RdsClient rdsClient;
protected final HostSpec customEndpointHostSpec;
protected final String endpointIdentifier;
Expand Down Expand Up @@ -151,7 +149,7 @@ public void run() {
CustomEndpointInfo cachedEndpointInfo = customEndpointInfoCache.get(this.customEndpointHostSpec.getHost());
if (cachedEndpointInfo != null && cachedEndpointInfo.equals(endpointInfo)) {
long elapsedTime = System.nanoTime() - start;
long sleepDuration = Math.min(0, this.refreshRateNano - elapsedTime);
long sleepDuration = Math.max(0, this.refreshRateNano - elapsedTime);
TimeUnit.NANOSECONDS.sleep(sleepDuration);
continue;
}
Expand All @@ -175,7 +173,7 @@ public void run() {
this.infoChangedCounter.inc();

long elapsedTime = System.nanoTime() - start;
long sleepDuration = Math.min(0, this.refreshRateNano - elapsedTime);
long sleepDuration = Math.max(0, this.refreshRateNano - elapsedTime);
TimeUnit.NANOSECONDS.sleep(sleepDuration);
} catch (InterruptedException e) {
throw e;
Expand All @@ -188,13 +186,18 @@ public void run() {
}
}
} catch (InterruptedException e) {
LOGGER.info(Messages.get("CustomEndpointMonitorImpl.interrupted", new Object[]{ this.customEndpointHostSpec }));
LOGGER.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Log level fine is probably better in this case.

Messages.get(
"CustomEndpointMonitorImpl.interrupted",
new Object[]{ this.customEndpointHostSpec.getHost() }));
Thread.currentThread().interrupt();
} finally {
customEndpointInfoCache.remove(this.customEndpointHostSpec.getHost());
this.rdsClient.close();
LOGGER.fine(
Messages.get("CustomEndpointMonitorImpl.stoppedMonitor", new Object[]{ this.customEndpointHostSpec }));
Messages.get(
"CustomEndpointMonitorImpl.stoppedMonitor",
new Object[]{ this.customEndpointHostSpec.getHost() }));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ private void switchToReaderConnection(final List<HostSpec> hosts)

if (this.readerHostSpec != null && !hosts.contains(this.readerHostSpec)) {
// The old reader cannot be used anymore because it is no longer in the list of allowed hosts.
this.readerConnection = null;
this.readerHostSpec = null;
closeConnectionIfIdle(this.readerConnection);
}

this.inReadWriteSplit = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ public Region getRegion(Properties props, String propKey) {
* Determines the AWS region from the given region string.
*
* @param regionString The connection properties for the connection being established.
* @return The AWS region of the given region string.
* @return The AWS region of the given region string, or null if the given string was null or empty.
*/
public Region getRegionFromRegionString(String regionString) {
if (StringUtils.isNullOrEmpty(regionString)) {
return null;
}

final Region region = Region.of(regionString);
if (!Region.regions().contains(region)) {
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -80,139 +78,115 @@
@Order(16)
public class CustomEndpointTest {
private static final Logger LOGGER = Logger.getLogger(CustomEndpointTest.class.getName());
protected static final String oneInstanceEndpointId = "test-endpoint-1-" + UUID.randomUUID();
protected static final String twoInstanceEndpointId = "test-endpoint-2-" + UUID.randomUUID();
protected static final Map<String, DBClusterEndpoint> endpoints = new HashMap<String, DBClusterEndpoint>() {{
put(oneInstanceEndpointId, null);
put(twoInstanceEndpointId, null);
}};
protected static final String endpointId = "test-endpoint-1-" + UUID.randomUUID();
protected static DBClusterEndpoint endpointInfo;

protected static final AuroraTestUtility auroraUtil = AuroraTestUtility.getUtility();
protected static final boolean reuseExistingEndpoints = false;
protected static final boolean reuseExistingEndpoint = false;

protected String currentWriter;

@BeforeAll
public static void createEndpoints() {
public static void setupEndpoint() {
TestEnvironmentInfo envInfo = TestEnvironment.getCurrent().getInfo();
String clusterId = envInfo.getAuroraClusterName();
String region = envInfo.getRegion();

try (RdsClient client = RdsClient.builder().region(Region.of(region)).build()) {
if (reuseExistingEndpoints) {
waitUntilEndpointsAvailable(client, clusterId);
if (reuseExistingEndpoint) {
waitUntilEndpointAvailable(client);
return;
}

List<TestInstanceInfo> instances = envInfo.getDatabaseInfo().getInstances();
createEndpoint(client, clusterId, oneInstanceEndpointId, instances.subList(0, 1));
createEndpoint(client, clusterId, twoInstanceEndpointId, instances.subList(0, 2));
waitUntilEndpointsAvailable(client, clusterId);
createEndpoint(client, clusterId, instances.subList(0, 1));
waitUntilEndpointAvailable(client);
}
}

private static void deleteEndpoints(RdsClient client) {
for (String endpointId : endpoints.keySet()) {
try {
client.deleteDBClusterEndpoint((builder) -> builder.dbClusterEndpointIdentifier(endpointId));
} catch (DbClusterEndpointNotFoundException e) {
// Custom endpoint already does not exist - do nothing.
}
}

waitUntilEndpointsDeleted(client);
}

private static void waitUntilEndpointsDeleted(RdsClient client) {
Copy link
Contributor Author

@aaron-congo aaron-congo Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be called after deleting the test custom endpoint after all tests in the file had been completed. I removed it to save some time (note that the custom endpoint still gets deleted, we are just no longer waiting for it to fully complete the deletion). Each custom endpoint will have a unique ID, so even though other tests could start running while the custom endpoint is still being deleted, no other tests should be require it to be fully deleted before running. Let me know if you think I should add it back

String clusterId = TestEnvironment.getCurrent().getInfo().getAuroraClusterName();
long deleteTimeoutNano = System.nanoTime() + TimeUnit.MINUTES.toNanos(5);
boolean allEndpointsDeleted = false;

while (!allEndpointsDeleted && System.nanoTime() < deleteTimeoutNano) {
Filter customEndpointFilter =
Filter.builder().name("db-cluster-endpoint-type").values("custom").build();
DescribeDbClusterEndpointsResponse endpointsResponse = client.describeDBClusterEndpoints(
(builder) ->
builder.dbClusterIdentifier(clusterId).filters(customEndpointFilter));
List<String> responseIDs = endpointsResponse.dbClusterEndpoints().stream()
.map(DBClusterEndpoint::dbClusterEndpointIdentifier).collect(Collectors.toList());

allEndpointsDeleted = endpoints.keySet().stream().noneMatch(responseIDs::contains);
}

if (!allEndpointsDeleted) {
throw new RuntimeException(
"The test setup step timed out while attempting to delete pre-existing test custom endpoints.");
}
}

private static void createEndpoint(
RdsClient client, String clusterId, String endpointId, List<TestInstanceInfo> instances) {
private static void createEndpoint(RdsClient client, String clusterId, List<TestInstanceInfo> instances) {
List<String> instanceIDs = instances.stream().map(TestInstanceInfo::getInstanceId).collect(Collectors.toList());
client.createDBClusterEndpoint((builder) ->
builder.dbClusterEndpointIdentifier(endpointId)
builder.dbClusterEndpointIdentifier(CustomEndpointTest.endpointId)
.dbClusterIdentifier(clusterId)
.endpointType("ANY")
.staticMembers(instanceIDs));
}

public static void waitUntilEndpointsAvailable(RdsClient client, String clusterId) {
public static void waitUntilEndpointAvailable(RdsClient client) {
long timeoutEndNano = System.nanoTime() + TimeUnit.MINUTES.toNanos(5);
boolean allEndpointsAvailable = false;
boolean available = false;

while (!allEndpointsAvailable && System.nanoTime() < timeoutEndNano) {
while (System.nanoTime() < timeoutEndNano) {
Filter customEndpointFilter =
Filter.builder().name("db-cluster-endpoint-type").values("custom").build();
DescribeDbClusterEndpointsResponse endpointsResponse = client.describeDBClusterEndpoints(
(builder) ->
builder.dbClusterIdentifier(clusterId).filters(customEndpointFilter));
builder.dbClusterEndpointIdentifier(endpointId).filters(customEndpointFilter));
List<DBClusterEndpoint> responseEndpoints = endpointsResponse.dbClusterEndpoints();

int numAvailableEndpoints = 0;
for (int i = 0; i < responseEndpoints.size() && numAvailableEndpoints < endpoints.size(); i++) {
DBClusterEndpoint endpoint = responseEndpoints.get(i);
String endpointId = endpoint.dbClusterEndpointIdentifier();
if (endpoints.containsKey(endpointId)) {
endpoints.put(endpointId, endpoint);
if ("available".equals(endpoint.status())) {
numAvailableEndpoints++;
}
if (responseEndpoints.size() != 1) {
try {
// Endpoint needs more time to get created
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

allEndpointsAvailable = numAvailableEndpoints == endpoints.size();
DBClusterEndpoint responseEndpoint = responseEndpoints.get(0);
endpointInfo = responseEndpoint;
available = "available".equals(responseEndpoint.status());
if (available) {
break;
}

try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

if (!allEndpointsAvailable) {
if (!available) {
throw new RuntimeException(
"The test setup step timed out while waiting for the new custom endpoints to become available.");
"The test setup step timed out while waiting for the custom endpoint to become available: '"
+ endpointId + "'.");
}
}

public static void waitUntilEndpointHasCorrectState(RdsClient client, String endpointId, List<String> membersList) {
public static void waitUntilEndpointHasMembers(RdsClient client, String endpointId, List<String> membersList) {
long start = System.nanoTime();

// Convert to set for later comparison.
Set<String> members = new HashSet<>(membersList);
long timeoutEndNano = System.nanoTime() + TimeUnit.MINUTES.toNanos(20);
boolean hasCorrectState = false;
while (!hasCorrectState && System.nanoTime() < timeoutEndNano) {
while (System.nanoTime() < timeoutEndNano) {
DescribeDbClusterEndpointsResponse response = client.describeDBClusterEndpoints(
(builder) ->
builder.dbClusterEndpointIdentifier(endpointId));
if (response.dbClusterEndpoints().size() != 1) {
fail("Unexpected number of endpoints returned while waiting for custom endpoint to have the specified list of "
+ "members. Expected 1, got " + response.dbClusterEndpoints().size());
+ "members. Expected 1, got " + response.dbClusterEndpoints().size() + ".");
}

DBClusterEndpoint endpoint = response.dbClusterEndpoints().get(0);
// Compare sets to ignore order when checking for members equality.
Set<String> responseMembers = new HashSet<>(endpoint.staticMembers());
hasCorrectState = responseMembers.equals(members) && "available".equals(endpoint.status());
if (hasCorrectState) {
break;
}

try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

if (!hasCorrectState) {
fail("Timed out while waiting for the custom endpoint to stabilize");
fail("Timed out while waiting for the custom endpoint to stabilize: '" + endpointId + "'.");
}

LOGGER.fine("waitUntilEndpointHasCorrectState took "
Expand All @@ -228,13 +202,21 @@ public void identifyWriter() {

@AfterAll
public static void cleanup() {
if (reuseExistingEndpoints) {
if (reuseExistingEndpoint) {
return;
}

String region = TestEnvironment.getCurrent().getInfo().getRegion();
try (RdsClient client = RdsClient.builder().region(Region.of(region)).build()) {
deleteEndpoints(client);
deleteEndpoint(client);
}
}

private static void deleteEndpoint(RdsClient client) {
try {
client.deleteDBClusterEndpoint((builder) -> builder.dbClusterEndpointIdentifier(endpointId));
} catch (DbClusterEndpointNotFoundException e) {
// Custom endpoint already does not exist - do nothing.
}
}

Expand All @@ -248,17 +230,15 @@ protected Properties initDefaultProps() {

@TestTemplate
public void testCustomEndpointFailover() throws SQLException, InterruptedException {
// The single-instance endpoint will be used for this test.
final DBClusterEndpoint endpoint = endpoints.get(oneInstanceEndpointId);
final TestDatabaseInfo dbInfo = TestEnvironment.getCurrent().getInfo().getDatabaseInfo();
final int port = dbInfo.getClusterEndpointPort();
final Properties props = initDefaultProps();
props.setProperty("failoverMode", "reader-or-writer");

try (final Connection conn = DriverManager.getConnection(
ConnectionStringHelper.getWrapperUrl(endpoint.endpoint(), port, dbInfo.getDefaultDbName()),
ConnectionStringHelper.getWrapperUrl(endpointInfo.endpoint(), port, dbInfo.getDefaultDbName()),
props)) {
List<String> endpointMembers = endpoint.staticMembers();
List<String> endpointMembers = endpointInfo.staticMembers();
String instanceId = auroraUtil.queryInstanceId(conn);
assertTrue(endpointMembers.contains(instanceId));

Expand All @@ -278,8 +258,6 @@ public void testCustomEndpointFailover() throws SQLException, InterruptedExcepti

@TestTemplate
public void testCustomEndpointReadWriteSplitting_withCustomEndpointChanges() throws SQLException {
// The one-instance custom endpoint will be used for this test.
final DBClusterEndpoint testEndpoint = endpoints.get(oneInstanceEndpointId);
TestEnvironmentInfo envInfo = TestEnvironment.getCurrent().getInfo();
final TestDatabaseInfo dbInfo = envInfo.getDatabaseInfo();
final int port = dbInfo.getClusterEndpointPort();
Expand All @@ -290,10 +268,10 @@ public void testCustomEndpointReadWriteSplitting_withCustomEndpointChanges() thr

try (final Connection conn =
DriverManager.getConnection(
ConnectionStringHelper.getWrapperUrl(testEndpoint.endpoint(), port, dbInfo.getDefaultDbName()),
ConnectionStringHelper.getWrapperUrl(endpointInfo.endpoint(), port, dbInfo.getDefaultDbName()),
props);
final RdsClient client = RdsClient.builder().region(Region.of(envInfo.getRegion())).build()) {
List<String> endpointMembers = testEndpoint.staticMembers();
List<String> endpointMembers = endpointInfo.staticMembers();
String instanceId1 = auroraUtil.queryInstanceId(conn);
assertTrue(endpointMembers.contains(instanceId1));

Expand Down Expand Up @@ -322,10 +300,10 @@ public void testCustomEndpointReadWriteSplitting_withCustomEndpointChanges() thr

client.modifyDBClusterEndpoint(
builder ->
builder.dbClusterEndpointIdentifier(oneInstanceEndpointId).staticMembers(instanceId1, newMember));
builder.dbClusterEndpointIdentifier(endpointId).staticMembers(instanceId1, newMember));

try {
waitUntilEndpointHasCorrectState(client, oneInstanceEndpointId, Arrays.asList(instanceId1, newMember));
waitUntilEndpointHasMembers(client, endpointId, Arrays.asList(instanceId1, newMember));

// We should now be able to switch to newMember.
assertDoesNotThrow(() -> conn.setReadOnly(newReadOnlyValue));
Expand All @@ -337,8 +315,8 @@ public void testCustomEndpointReadWriteSplitting_withCustomEndpointChanges() thr
} finally {
client.modifyDBClusterEndpoint(
builder ->
builder.dbClusterEndpointIdentifier(oneInstanceEndpointId).staticMembers(instanceId1));
waitUntilEndpointHasCorrectState(client, oneInstanceEndpointId, Collections.singletonList(instanceId1));
builder.dbClusterEndpointIdentifier(endpointId).staticMembers(instanceId1));
waitUntilEndpointHasMembers(client, endpointId, Collections.singletonList(instanceId1));
}

// We should not be able to switch again because newMember was removed from the custom endpoint.
Expand Down
Loading