From 7c83424dac0baff75846669d31aa05a04f22dee8 Mon Sep 17 00:00:00 2001 From: Radovan Date: Fri, 15 Mar 2024 11:32:44 +0200 Subject: [PATCH] Make grpc's AsynBackup truly Async (#720) --- k8s/Dockerfile | 8 ++- k8s/medusa.sh | 5 ++ medusa/service/grpc/client.py | 38 +++++++------- medusa/service/grpc/server.py | 51 ++++++++++++------- .../features/integration_tests.feature | 4 ++ .../features/steps/integration_steps.py | 46 +++++++++++------ 6 files changed, 97 insertions(+), 55 deletions(-) create mode 100755 k8s/medusa.sh diff --git a/k8s/Dockerfile b/k8s/Dockerfile index f596e800..fa67aef1 100644 --- a/k8s/Dockerfile +++ b/k8s/Dockerfile @@ -31,7 +31,7 @@ ENV POETRY_VIRTUALENVS_IN_PROJECT=true RUN python3 -m pip install -U pip && pip3 install --ignore-installed --user poetry # Build medusa itself so we can add the executables in the final image -RUN cd /build && poetry install +RUN cd /build && poetry build && poetry install # Could be python:slim, but we have a .sh entrypoint FROM ubuntu:22.04 @@ -52,7 +52,8 @@ WORKDIR /home/cassandra ENV DEBUG_VERSION 1 ENV DEBUG_SLEEP 0 -ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:$PATH +ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:/home/cassandra/bin:$PATH +ENV PYTHONPATH=/home/cassandra COPY --from=base --chown=cassandra:cassandra /root/.local /home/cassandra/.local @@ -61,6 +62,9 @@ COPY --from=base --chown=cassandra:cassandra /build/pyproject.toml /home/cassand COPY --chown=cassandra:cassandra medusa /home/cassandra/medusa COPY --chown=cassandra:cassandra k8s/docker-entrypoint.sh /home/cassandra +RUN mkdir -p /home/cassandra/bin +COPY --chown=cassandra:cassandra k8s/medusa.sh /home/cassandra/bin/medusa + # Avoid Click locale errors when running medusa directly ENV LC_ALL=C.UTF-8 ENV LANG=C.UTF-8 diff --git a/k8s/medusa.sh b/k8s/medusa.sh new file mode 100755 index 00000000..775fabda --- /dev/null +++ b/k8s/medusa.sh @@ -0,0 +1,5 @@ +#!/home/cassandra/.venv/bin/python +import sys +from medusa.medusacli import cli +if __name__ == '__main__': + sys.exit(cli()) diff --git a/medusa/service/grpc/client.py b/medusa/service/grpc/client.py index ed927398..9c7845b3 100644 --- a/medusa/service/grpc/client.py +++ b/medusa/service/grpc/client.py @@ -24,13 +24,13 @@ class Client: def __init__(self, target, channel_options=[]): - self.channel = grpc.insecure_channel(target, options=channel_options) + self.channel = grpc.aio.insecure_channel(target, options=channel_options) - def health_check(self): + async def health_check(self): try: health_stub = health_pb2_grpc.HealthStub(self.channel) request = health_pb2.HealthCheckRequest() - return health_stub.Check(request) + return await health_stub.Check(request) except grpc.RpcError as e: logging.error("Failed health check due to error: {}".format(e)) return None @@ -45,65 +45,65 @@ def create_backup_stub(self, mode): raise RuntimeError("{} is not a recognized backup mode".format(mode)) return backup_mode, stub - def async_backup(self, name, mode): + async def async_backup(self, name, mode): try: backup_mode, stub = self.create_backup_stub(mode=mode) request = medusa_pb2.BackupRequest(name=name, mode=backup_mode) - return stub.AsyncBackup(request) + return await stub.AsyncBackup(request) except grpc.RpcError as e: logging.error("Failed async backup for name: {} and mode: {} due to error: {}".format(name, mode, e)) return None - def backup(self, name, mode): + async def backup(self, name, mode): try: backup_mode, stub = self.create_backup_stub(mode=mode) request = medusa_pb2.BackupRequest(name=name, mode=backup_mode) - return stub.Backup(request) + return await stub.Backup(request) except grpc.RpcError as e: logging.error("Failed sync backup for name: {} and mode: {} due to error: {}".format(name, mode, e)) return None - def delete_backup(self, name): + async def delete_backup(self, name): try: stub = medusa_pb2_grpc.MedusaStub(self.channel) request = medusa_pb2.DeleteBackupRequest(name=name) - stub.DeleteBackup(request) + await stub.DeleteBackup(request) except grpc.RpcError as e: logging.error("Failed to delete backup for name: {} due to error: {}".format(name, e)) - def get_backup(self, backup_name): + async def get_backup(self, backup_name): try: stub = medusa_pb2_grpc.MedusaStub(self.channel) request = medusa_pb2.GetBackupRequest(backupName=backup_name) - response = stub.GetBackup(request) + response = await stub.GetBackup(request) return response.backup except grpc.RpcError as e: logging.error("Failed to obtain backup for name: {} due to error: {}".format(backup_name, e)) return None - def get_backups(self): + async def get_backups(self): try: stub = medusa_pb2_grpc.MedusaStub(self.channel) request = medusa_pb2.GetBackupsRequest() - response = stub.GetBackups(request) + response = await stub.GetBackups(request) return response.backups except grpc.RpcError as e: logging.error("Failed to obtain list of backups due to error: {}".format(e)) return None - def get_backup_status(self, name): + async def get_backup_status(self, name): try: stub = medusa_pb2_grpc.MedusaStub(self.channel) request = medusa_pb2.BackupStatusRequest(backupName=name) - resp = stub.BackupStatus(request) + resp = await stub.BackupStatus(request) return resp.status except grpc.RpcError as e: logging.error("Failed to determine backup status for name: {} due to error: {}".format(name, e)) return medusa_pb2.StatusType.UNKNOWN - def backup_exists(self, name): + async def backup_exists(self, name): try: - backups = self.get_backups() + backups = await self.get_backups() for backup in list(backups): if backup.backupName == name: return True @@ -112,11 +112,11 @@ def backup_exists(self, name): logging.error("Failed to determine if backup exists for backup name: {} due to error: {}".format(name, e)) return False - def purge_backups(self): + async def purge_backups(self): try: stub = medusa_pb2_grpc.MedusaStub(self.channel) request = medusa_pb2.PurgeBackupsRequest() - resp = stub.PurgeBackups(request) + resp = await stub.PurgeBackups(request) return resp except grpc.RpcError as e: logging.error("Failed to purge backups due to error: {}".format(e)) diff --git a/medusa/service/grpc/server.py b/medusa/service/grpc/server.py index 52c7e166..0238b761 100644 --- a/medusa/service/grpc/server.py +++ b/medusa/service/grpc/server.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import json import logging import os -import signal import sys from collections import defaultdict from concurrent import futures @@ -26,6 +26,7 @@ import grpc import grpc_health.v1.health +from grpc import aio from grpc_health.v1 import health_pb2_grpc from medusa import backup_node @@ -38,7 +39,6 @@ from medusa.service.grpc import medusa_pb2 from medusa.service.grpc import medusa_pb2_grpc from medusa.storage import Storage -from medusa.storage.abstract_storage import AbstractStorage TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S' BACKUP_MODE_DIFFERENTIAL = "differential" @@ -52,7 +52,7 @@ def __init__(self, config_file_path, testing=False): self.config_file_path = config_file_path self.medusa_config = self.create_config() self.testing = testing - self.grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[ + self.grpc_server = aio.server(futures.ThreadPoolExecutor(max_workers=10), options=[ ('grpc.max_send_message_length', self.medusa_config.grpc.max_send_message_length), ('grpc.max_receive_message_length', self.medusa_config.grpc.max_receive_message_length) ]) @@ -61,9 +61,9 @@ def __init__(self, config_file_path, testing=False): def shutdown(self, signum, frame): logging.info("Shutting down GRPC server") handle_backup_removal_all() - self.grpc_server.stop(0) + asyncio.get_event_loop().run_until_complete(self.grpc_server.stop(0)) - def serve(self): + async def serve(self): config = self.create_config() self.configure_console_logging() @@ -72,11 +72,14 @@ def serve(self): logging.info('Starting server. Listening on port 50051.') self.grpc_server.add_insecure_port('[::]:50051') - self.grpc_server.start() + await self.grpc_server.start() if not self.testing: - signal.signal(signal.SIGTERM, self.shutdown) - self.grpc_server.wait_for_termination() + try: + await self.grpc_server.wait_for_termination() + except asyncio.exceptions.CancelledError: + logging.info("Swallowing asyncio.exceptions.CancelledError. This should get fixed at some point") + handle_backup_removal_all() def create_config(self): config_file = Path(self.config_file_path) @@ -88,6 +91,10 @@ def configure_console_logging(self): root_logger = logging.getLogger('') root_logger.setLevel(logging.DEBUG) + # Clean up handlers on the root_logger, this prevents duplicate log lines + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + log_format = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s') console_handler = logging.StreamHandler() @@ -108,10 +115,9 @@ def __init__(self, config): self.config = config self.storage_config = config.storage - def AsyncBackup(self, request, context): + async def AsyncBackup(self, request, context): # TODO pass the staggered arg logging.info("Performing ASYNC backup {} (type={})".format(request.name, request.mode)) - loop = AbstractStorage.get_or_create_event_loop() response = medusa_pb2.BackupResponse() mode = BACKUP_MODE_DIFFERENTIAL if medusa_pb2.BackupRequest.Mode.FULL == request.mode: @@ -120,13 +126,16 @@ def AsyncBackup(self, request, context): try: response.backupName = request.name response.status = response.status = medusa_pb2.StatusType.IN_PROGRESS - with ThreadPoolExecutor(max_workers=1, thread_name_prefix=request.name) as executor: - BackupMan.register_backup(request.name, is_async=True) - backup_future = loop.run_in_executor(executor, backup_node.handle_backup, self.config, - request.name, None, - False, mode) - backup_future.add_done_callback(record_backup_info) - BackupMan.set_backup_future(request.name, backup_future) + BackupMan.register_backup(request.name, is_async=True) + executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix=request.name) + loop = asyncio.get_running_loop() + backup_future = loop.run_in_executor( + executor, + backup_node.handle_backup, + self.config, request.name, None, False, mode + ) + backup_future.add_done_callback(record_backup_info) + BackupMan.set_backup_future(request.name, backup_future) except Exception as e: @@ -382,11 +391,15 @@ def handle_backup_removal_all(): logging.error("Failed to cleanup all backups") -if __name__ == '__main__': +async def main(): if len(sys.argv) > 2: config_file_path = sys.argv[2] else: config_file_path = "/etc/medusa/medusa.ini" server = Server(config_file_path) - server.serve() + await server.serve() + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/tests/integration/features/integration_tests.feature b/tests/integration/features/integration_tests.feature index c1b25f5e..47a51cfa 100644 --- a/tests/integration/features/integration_tests.feature +++ b/tests/integration/features/integration_tests.feature @@ -926,6 +926,7 @@ Feature: Integration tests When I load 100 rows in the "medusa.test" table When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_23" + Then I wait for the async backup "grpc_backup_23" to finish Then the backup index exists Then I verify over gRPC that the backup "grpc_backup_23" exists and is of type "differential" Then I can see the backup index entry for "grpc_backup_23" @@ -1038,6 +1039,7 @@ Feature: Integration tests When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command When I run a "ccm node2 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_28" + Then I wait for the async backup "grpc_backup_28" to finish Then the backup index exists Then I verify over gRPC that the backup "grpc_backup_28" exists and is of type "differential" # The backup status is not actually a SUCCESS because the node2 has not been backed up. @@ -1066,12 +1068,14 @@ Feature: Integration tests When I run a DSE "nodetool flush" command Then I can make a search query against the "medusa"."test" table When I perform an async backup over gRPC in "differential" mode of the node named "backup29-1" + Then I wait for the async backup "backup29-1" to finish Then the backup index exists Then I can see the backup named "backup29-1" when I list the backups And the backup "backup29-1" has server_type "dse" in its metadata Then I verify over gRPC that the backup "backup29-1" exists and is of type "differential" Then I verify over gRPC that the backup "backup29-1" has expected status SUCCESS When I perform an async backup over gRPC in "differential" mode of the node named "backup29-2" + Then I wait for the async backup "backup29-2" to finish Then the backup index exists Then I can see the backup named "backup29-2" when I list the backups And the backup "backup29-2" has server_type "dse" in its metadata diff --git a/tests/integration/features/steps/integration_steps.py b/tests/integration/features/steps/integration_steps.py index 5f1d5965..469bb6af 100644 --- a/tests/integration/features/steps/integration_steps.py +++ b/tests/integration/features/steps/integration_steps.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import datetime import glob import json @@ -89,6 +90,11 @@ AWS_CREDENTIALS = "~/.aws/credentials" GCS_CREDENTIALS = "~/medusa_credentials.json" +# hide cassandra driver logs, they are overly verbose and we don't really need them for tests +for logger_name in {'cassandra.io', 'cassandra.pool', 'cassandra.cluster', 'cassandra.connection'}: + logger = logging.getLogger(logger_name) + logger.setLevel(logging.CRITICAL) + def kill_cassandra(): p = subprocess.Popen(["ps", "-Af"], stdout=subprocess.PIPE) @@ -154,7 +160,7 @@ def __init__(self, config): self.config.write(config_file) self.grpc_server = medusa.service.grpc.server.Server(medusa_conf_file, testing=True) - self.grpc_server.serve() + asyncio.get_event_loop().run_until_complete(self.grpc_server.serve()) def destroy(self): self.grpc_server.shutdown(None, None) @@ -698,18 +704,28 @@ def _i_perform_a_backup_of_the_node_named_backupname(context, backup_mode, backu @when(r'I perform a backup over gRPC in "{backup_mode}" mode of the node named "{backup_name}"') def _i_perform_grpc_backup_of_node_named_backupname(context, backup_mode, backup_name): - context.grpc_client.backup(backup_name, backup_mode) + asyncio.get_event_loop().run_until_complete(context.grpc_client.backup(backup_name, backup_mode)) @when(r'I perform an async backup over gRPC in "{backup_mode}" mode of the node named "{backup_name}"') def _i_perform_grpc_async_backup_of_node_named_backupname(context, backup_mode, backup_name): - context.grpc_client.async_backup(backup_name, backup_mode) + asyncio.get_event_loop().run_until_complete(context.grpc_client.async_backup(backup_name, backup_mode)) + + +@then(r'I wait for the async backup "{backup_name}" to finish') +def _i_wait_for_async_backup_to_finish(context, backup_name): + while True: + status = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup_status(backup_name)) + if status == medusa_pb2.StatusType.SUCCESS: + break + logging.debug(f'Backup {backup_name} is not yet finished, waiting...') + time.sleep(2) @when(r'I perform a backup over gRPC in "{backup_mode}" mode of the node named "{backup_name}" and it fails') def _i_perform_grpc_backup_of_node_named_backupname_fails(context, backup_mode, backup_name): try: - context.grpc_client.backup(backup_name, backup_mode) + asyncio.get_event_loop().run_until_complete(context.grpc_client.backup(backup_name, backup_mode)) raise AssertionError("Backup process should have failed but didn't.") except Exception: # This exception is required to be raised to validate the step @@ -718,7 +734,7 @@ def _i_perform_grpc_backup_of_node_named_backupname_fails(context, backup_mode, @then(r'I verify over gRPC that the backup "{backup_name}" exists and is of type "{backup_type}"') def _i_verify_over_grpc_backup_exists(context, backup_name, backup_type): - backup = context.grpc_client.get_backup(backup_name=backup_name) + backup = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup(backup_name=backup_name)) assert backup.backupName == backup_name assert backup.backupType == backup_type assert backup.totalSize > 0 @@ -732,26 +748,26 @@ def _i_sleep_for_seconds(context, num_secs): @then(r'I verify over gRPC that the backup "{backup_name}" has expected status IN_PROGRESS') def _i_verify_over_grpc_backup_has_status_in_progress(context, backup_name): - status = context.grpc_client.get_backup_status(backup_name) + status = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup_status(backup_name)) assert status == medusa_pb2.StatusType.IN_PROGRESS @then(r'I verify over gRPC that the backup "{backup_name}" has expected status UNKNOWN') def _i_verify_over_grpc_backup_has_status_unknown(context, backup_name): - status = context.grpc_client.get_backup_status(backup_name) + status = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup_status(backup_name)) assert status == medusa_pb2.StatusType.UNKNOWN @then(r'I verify over gRPC that the backup "{backup_name}" has expected status SUCCESS') def _i_verify_over_grpc_backup_has_status_success(context, backup_name): - status = context.grpc_client.get_backup_status(backup_name) + status = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup_status(backup_name)) logging.info(f'status={status}') assert status == medusa_pb2.StatusType.SUCCESS @then(r'I verify over gRPC that I can see both backups "{backup_name_1}" and "{backup_name_2}"') def _i_verify_over_grpc_that_i_can_see_both_backups(context, backup_name_1, backup_name_2): - backups = context.grpc_client.get_backups() + backups = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backups()) assert len(backups) == 2 assert backups[0].backupName == backup_name_1 @@ -765,7 +781,7 @@ def _i_verify_over_grpc_that_i_can_see_both_backups(context, backup_name_1, back @then(r'I verify over gRPC that the backup "{backup_name}" has the expected placement information') def _i_verify_over_grpc_backup_has_expected_information(context, backup_name): - backup = context.grpc_client.get_backup(backup_name) + backup = asyncio.get_event_loop().run_until_complete(context.grpc_client.get_backup(backup_name)) assert backup.nodes[0].host == "127.0.0.1" assert backup.nodes[0].datacenter in ["dc1", "datacenter1", "DC1"] assert backup.nodes[0].rack in ["rack1", "r1"] @@ -774,13 +790,13 @@ def _i_verify_over_grpc_backup_has_expected_information(context, backup_name): @then(r'I delete the backup "{backup_name}" over gRPC') def _i_delete_backup_grpc(context, backup_name): - context.grpc_client.delete_backup(backup_name) + asyncio.get_event_loop().run_until_complete(context.grpc_client.delete_backup(backup_name)) @then(r'I delete the backup "{backup_name}" over gRPC and it fails') def _i_delete_backup_grpc_fail(context, backup_name): try: - context.grpc_client.delete_backup(backup_name) + asyncio.get_event_loop().run_until_complete(context.grpc_client.delete_backup(backup_name)) raise AssertionError("Backup deletion should have failed but didn't.") except Exception: # This exception is required to be raised to validate the step @@ -789,7 +805,7 @@ def _i_delete_backup_grpc_fail(context, backup_name): @then(r'I verify over gRPC that the backup "{backup_name}" does not exist') def _i_verify_over_grpc_backup_does_not_exist(context, backup_name): - assert not context.grpc_client.backup_exists(backup_name) + assert not asyncio.get_event_loop().run_until_complete(context.grpc_client.backup_exists(backup_name)) @then(r'I verify that backup manager has removed the backup "{backup_name}"') @@ -802,7 +818,7 @@ def _i_verify_backup_manager_removed_backup(context, backup_name): @then(r'the gRPC server is up') def _check_grpc_server_is_up(context): - resp = context.grpc_client.health_check() + resp = asyncio.get_event_loop().run_until_complete(context.grpc_client.health_check()) assert resp.status == 1 @@ -1522,7 +1538,7 @@ def _all_files_of_table_in_backup_were_uploaded_with_key_configured_in_storage_c @when(r'I perform a purge over gRPC') def _i_perform_a_purge_over_grpc_with_a_max_backup_count(context): - context.purge_result = context.grpc_client.purge_backups() + context.purge_result = asyncio.get_event_loop().run_until_complete((context.grpc_client.purge_backups())) @then(r'{nb_purged_backups} backup has been purged')