Skip to content

Commit

Permalink
Make grpc's AsynBackup truly Async (#720)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek authored Mar 15, 2024
1 parent 36cc3fd commit 7c83424
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 55 deletions.
8 changes: 6 additions & 2 deletions k8s/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV POETRY_VIRTUALENVS_IN_PROJECT=true
RUN python3 -m pip install -U pip && pip3 install --ignore-installed --user poetry

# Build medusa itself so we can add the executables in the final image
RUN cd /build && poetry install
RUN cd /build && poetry build && poetry install

# Could be python:slim, but we have a .sh entrypoint
FROM ubuntu:22.04
Expand All @@ -52,7 +52,8 @@ WORKDIR /home/cassandra

ENV DEBUG_VERSION 1
ENV DEBUG_SLEEP 0
ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:$PATH
ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:/home/cassandra/bin:$PATH
ENV PYTHONPATH=/home/cassandra

COPY --from=base --chown=cassandra:cassandra /root/.local /home/cassandra/.local

Expand All @@ -61,6 +62,9 @@ COPY --from=base --chown=cassandra:cassandra /build/pyproject.toml /home/cassand
COPY --chown=cassandra:cassandra medusa /home/cassandra/medusa
COPY --chown=cassandra:cassandra k8s/docker-entrypoint.sh /home/cassandra

RUN mkdir -p /home/cassandra/bin
COPY --chown=cassandra:cassandra k8s/medusa.sh /home/cassandra/bin/medusa

# Avoid Click locale errors when running medusa directly
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
Expand Down
5 changes: 5 additions & 0 deletions k8s/medusa.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/home/cassandra/.venv/bin/python
import sys
from medusa.medusacli import cli
if __name__ == '__main__':
sys.exit(cli())
38 changes: 19 additions & 19 deletions medusa/service/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

class Client:
def __init__(self, target, channel_options=[]):
self.channel = grpc.insecure_channel(target, options=channel_options)
self.channel = grpc.aio.insecure_channel(target, options=channel_options)

def health_check(self):
async def health_check(self):
try:
health_stub = health_pb2_grpc.HealthStub(self.channel)
request = health_pb2.HealthCheckRequest()
return health_stub.Check(request)
return await health_stub.Check(request)
except grpc.RpcError as e:
logging.error("Failed health check due to error: {}".format(e))
return None
Expand All @@ -45,65 +45,65 @@ def create_backup_stub(self, mode):
raise RuntimeError("{} is not a recognized backup mode".format(mode))
return backup_mode, stub

def async_backup(self, name, mode):
async def async_backup(self, name, mode):
try:
backup_mode, stub = self.create_backup_stub(mode=mode)
request = medusa_pb2.BackupRequest(name=name, mode=backup_mode)
return stub.AsyncBackup(request)
return await stub.AsyncBackup(request)
except grpc.RpcError as e:
logging.error("Failed async backup for name: {} and mode: {} due to error: {}".format(name, mode, e))
return None

def backup(self, name, mode):
async def backup(self, name, mode):
try:
backup_mode, stub = self.create_backup_stub(mode=mode)
request = medusa_pb2.BackupRequest(name=name, mode=backup_mode)
return stub.Backup(request)
return await stub.Backup(request)
except grpc.RpcError as e:
logging.error("Failed sync backup for name: {} and mode: {} due to error: {}".format(name, mode, e))
return None

def delete_backup(self, name):
async def delete_backup(self, name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.DeleteBackupRequest(name=name)
stub.DeleteBackup(request)
await stub.DeleteBackup(request)
except grpc.RpcError as e:
logging.error("Failed to delete backup for name: {} due to error: {}".format(name, e))

def get_backup(self, backup_name):
async def get_backup(self, backup_name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.GetBackupRequest(backupName=backup_name)
response = stub.GetBackup(request)
response = await stub.GetBackup(request)
return response.backup
except grpc.RpcError as e:
logging.error("Failed to obtain backup for name: {} due to error: {}".format(backup_name, e))
return None

def get_backups(self):
async def get_backups(self):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.GetBackupsRequest()
response = stub.GetBackups(request)
response = await stub.GetBackups(request)
return response.backups
except grpc.RpcError as e:
logging.error("Failed to obtain list of backups due to error: {}".format(e))
return None

def get_backup_status(self, name):
async def get_backup_status(self, name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.BackupStatusRequest(backupName=name)
resp = stub.BackupStatus(request)
resp = await stub.BackupStatus(request)
return resp.status
except grpc.RpcError as e:
logging.error("Failed to determine backup status for name: {} due to error: {}".format(name, e))
return medusa_pb2.StatusType.UNKNOWN

def backup_exists(self, name):
async def backup_exists(self, name):
try:
backups = self.get_backups()
backups = await self.get_backups()
for backup in list(backups):
if backup.backupName == name:
return True
Expand All @@ -112,11 +112,11 @@ def backup_exists(self, name):
logging.error("Failed to determine if backup exists for backup name: {} due to error: {}".format(name, e))
return False

def purge_backups(self):
async def purge_backups(self):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.PurgeBackupsRequest()
resp = stub.PurgeBackups(request)
resp = await stub.PurgeBackups(request)
return resp
except grpc.RpcError as e:
logging.error("Failed to purge backups due to error: {}".format(e))
Expand Down
51 changes: 32 additions & 19 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import json
import logging
import os
import signal
import sys
from collections import defaultdict
from concurrent import futures
Expand All @@ -26,6 +26,7 @@

import grpc
import grpc_health.v1.health
from grpc import aio
from grpc_health.v1 import health_pb2_grpc

from medusa import backup_node
Expand All @@ -38,7 +39,6 @@
from medusa.service.grpc import medusa_pb2
from medusa.service.grpc import medusa_pb2_grpc
from medusa.storage import Storage
from medusa.storage.abstract_storage import AbstractStorage

TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S'
BACKUP_MODE_DIFFERENTIAL = "differential"
Expand All @@ -52,7 +52,7 @@ def __init__(self, config_file_path, testing=False):
self.config_file_path = config_file_path
self.medusa_config = self.create_config()
self.testing = testing
self.grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
self.grpc_server = aio.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', self.medusa_config.grpc.max_send_message_length),
('grpc.max_receive_message_length', self.medusa_config.grpc.max_receive_message_length)
])
Expand All @@ -61,9 +61,9 @@ def __init__(self, config_file_path, testing=False):
def shutdown(self, signum, frame):
logging.info("Shutting down GRPC server")
handle_backup_removal_all()
self.grpc_server.stop(0)
asyncio.get_event_loop().run_until_complete(self.grpc_server.stop(0))

def serve(self):
async def serve(self):
config = self.create_config()
self.configure_console_logging()

Expand All @@ -72,11 +72,14 @@ def serve(self):

logging.info('Starting server. Listening on port 50051.')
self.grpc_server.add_insecure_port('[::]:50051')
self.grpc_server.start()
await self.grpc_server.start()

if not self.testing:
signal.signal(signal.SIGTERM, self.shutdown)
self.grpc_server.wait_for_termination()
try:
await self.grpc_server.wait_for_termination()
except asyncio.exceptions.CancelledError:
logging.info("Swallowing asyncio.exceptions.CancelledError. This should get fixed at some point")
handle_backup_removal_all()

def create_config(self):
config_file = Path(self.config_file_path)
Expand All @@ -88,6 +91,10 @@ def configure_console_logging(self):
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)

# Clean up handlers on the root_logger, this prevents duplicate log lines
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)

log_format = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')

console_handler = logging.StreamHandler()
Expand All @@ -108,10 +115,9 @@ def __init__(self, config):
self.config = config
self.storage_config = config.storage

def AsyncBackup(self, request, context):
async def AsyncBackup(self, request, context):
# TODO pass the staggered arg
logging.info("Performing ASYNC backup {} (type={})".format(request.name, request.mode))
loop = AbstractStorage.get_or_create_event_loop()
response = medusa_pb2.BackupResponse()
mode = BACKUP_MODE_DIFFERENTIAL
if medusa_pb2.BackupRequest.Mode.FULL == request.mode:
Expand All @@ -120,13 +126,16 @@ def AsyncBackup(self, request, context):
try:
response.backupName = request.name
response.status = response.status = medusa_pb2.StatusType.IN_PROGRESS
with ThreadPoolExecutor(max_workers=1, thread_name_prefix=request.name) as executor:
BackupMan.register_backup(request.name, is_async=True)
backup_future = loop.run_in_executor(executor, backup_node.handle_backup, self.config,
request.name, None,
False, mode)
backup_future.add_done_callback(record_backup_info)
BackupMan.set_backup_future(request.name, backup_future)
BackupMan.register_backup(request.name, is_async=True)
executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix=request.name)
loop = asyncio.get_running_loop()
backup_future = loop.run_in_executor(
executor,
backup_node.handle_backup,
self.config, request.name, None, False, mode
)
backup_future.add_done_callback(record_backup_info)
BackupMan.set_backup_future(request.name, backup_future)

except Exception as e:

Expand Down Expand Up @@ -382,11 +391,15 @@ def handle_backup_removal_all():
logging.error("Failed to cleanup all backups")


if __name__ == '__main__':
async def main():
if len(sys.argv) > 2:
config_file_path = sys.argv[2]
else:
config_file_path = "/etc/medusa/medusa.ini"

server = Server(config_file_path)
server.serve()
await server.serve()


if __name__ == '__main__':
asyncio.run(main())
4 changes: 4 additions & 0 deletions tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ Feature: Integration tests
When I load 100 rows in the "medusa.test" table
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_23"
Then I wait for the async backup "grpc_backup_23" to finish
Then the backup index exists
Then I verify over gRPC that the backup "grpc_backup_23" exists and is of type "differential"
Then I can see the backup index entry for "grpc_backup_23"
Expand Down Expand Up @@ -1038,6 +1039,7 @@ Feature: Integration tests
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I run a "ccm node2 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_28"
Then I wait for the async backup "grpc_backup_28" to finish
Then the backup index exists
Then I verify over gRPC that the backup "grpc_backup_28" exists and is of type "differential"
# The backup status is not actually a SUCCESS because the node2 has not been backed up.
Expand Down Expand Up @@ -1066,12 +1068,14 @@ Feature: Integration tests
When I run a DSE "nodetool flush" command
Then I can make a search query against the "medusa"."test" table
When I perform an async backup over gRPC in "differential" mode of the node named "backup29-1"
Then I wait for the async backup "backup29-1" to finish
Then the backup index exists
Then I can see the backup named "backup29-1" when I list the backups
And the backup "backup29-1" has server_type "dse" in its metadata
Then I verify over gRPC that the backup "backup29-1" exists and is of type "differential"
Then I verify over gRPC that the backup "backup29-1" has expected status SUCCESS
When I perform an async backup over gRPC in "differential" mode of the node named "backup29-2"
Then I wait for the async backup "backup29-2" to finish
Then the backup index exists
Then I can see the backup named "backup29-2" when I list the backups
And the backup "backup29-2" has server_type "dse" in its metadata
Expand Down
Loading

0 comments on commit 7c83424

Please # to comment.