Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Make grpc's AsynBackup trully Async #720

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions k8s/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ENV POETRY_VIRTUALENVS_IN_PROJECT=true
RUN python3 -m pip install -U pip && pip3 install --ignore-installed --user poetry

# Build medusa itself so we can add the executables in the final image
RUN cd /build && poetry install
RUN cd /build && poetry build && poetry install

# Could be python:slim, but we have a .sh entrypoint
FROM ubuntu:22.04
Expand All @@ -52,7 +52,8 @@ WORKDIR /home/cassandra

ENV DEBUG_VERSION 1
ENV DEBUG_SLEEP 0
ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:$PATH
ENV PATH=/home/cassandra/.local/bin:/home/cassandra/google-cloud-sdk/bin:/home/cassandra/bin:$PATH
ENV PYTHONPATH=/home/cassandra

COPY --from=base --chown=cassandra:cassandra /root/.local /home/cassandra/.local

Expand All @@ -61,6 +62,9 @@ COPY --from=base --chown=cassandra:cassandra /build/pyproject.toml /home/cassand
COPY --chown=cassandra:cassandra medusa /home/cassandra/medusa
COPY --chown=cassandra:cassandra k8s/docker-entrypoint.sh /home/cassandra

RUN mkdir -p /home/cassandra/bin
COPY --chown=cassandra:cassandra k8s/medusa.sh /home/cassandra/bin/medusa

# Avoid Click locale errors when running medusa directly
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
Expand Down
5 changes: 5 additions & 0 deletions k8s/medusa.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/home/cassandra/.venv/bin/python
import sys
from medusa.medusacli import cli
if __name__ == '__main__':
sys.exit(cli())
38 changes: 19 additions & 19 deletions medusa/service/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

class Client:
def __init__(self, target, channel_options=[]):
self.channel = grpc.insecure_channel(target, options=channel_options)
self.channel = grpc.aio.insecure_channel(target, options=channel_options)

def health_check(self):
async def health_check(self):
try:
health_stub = health_pb2_grpc.HealthStub(self.channel)
request = health_pb2.HealthCheckRequest()
return health_stub.Check(request)
return await health_stub.Check(request)
except grpc.RpcError as e:
logging.error("Failed health check due to error: {}".format(e))
return None
Expand All @@ -45,65 +45,65 @@ def create_backup_stub(self, mode):
raise RuntimeError("{} is not a recognized backup mode".format(mode))
return backup_mode, stub

def async_backup(self, name, mode):
async def async_backup(self, name, mode):
try:
backup_mode, stub = self.create_backup_stub(mode=mode)
request = medusa_pb2.BackupRequest(name=name, mode=backup_mode)
return stub.AsyncBackup(request)
return await stub.AsyncBackup(request)
except grpc.RpcError as e:
logging.error("Failed async backup for name: {} and mode: {} due to error: {}".format(name, mode, e))
return None

def backup(self, name, mode):
async def backup(self, name, mode):
try:
backup_mode, stub = self.create_backup_stub(mode=mode)
request = medusa_pb2.BackupRequest(name=name, mode=backup_mode)
return stub.Backup(request)
return await stub.Backup(request)
except grpc.RpcError as e:
logging.error("Failed sync backup for name: {} and mode: {} due to error: {}".format(name, mode, e))
return None

def delete_backup(self, name):
async def delete_backup(self, name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.DeleteBackupRequest(name=name)
stub.DeleteBackup(request)
await stub.DeleteBackup(request)
except grpc.RpcError as e:
logging.error("Failed to delete backup for name: {} due to error: {}".format(name, e))

def get_backup(self, backup_name):
async def get_backup(self, backup_name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.GetBackupRequest(backupName=backup_name)
response = stub.GetBackup(request)
response = await stub.GetBackup(request)
return response.backup
except grpc.RpcError as e:
logging.error("Failed to obtain backup for name: {} due to error: {}".format(backup_name, e))
return None

def get_backups(self):
async def get_backups(self):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.GetBackupsRequest()
response = stub.GetBackups(request)
response = await stub.GetBackups(request)
return response.backups
except grpc.RpcError as e:
logging.error("Failed to obtain list of backups due to error: {}".format(e))
return None

def get_backup_status(self, name):
async def get_backup_status(self, name):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.BackupStatusRequest(backupName=name)
resp = stub.BackupStatus(request)
resp = await stub.BackupStatus(request)
return resp.status
except grpc.RpcError as e:
logging.error("Failed to determine backup status for name: {} due to error: {}".format(name, e))
return medusa_pb2.StatusType.UNKNOWN

def backup_exists(self, name):
async def backup_exists(self, name):
try:
backups = self.get_backups()
backups = await self.get_backups()
for backup in list(backups):
if backup.backupName == name:
return True
Expand All @@ -112,11 +112,11 @@ def backup_exists(self, name):
logging.error("Failed to determine if backup exists for backup name: {} due to error: {}".format(name, e))
return False

def purge_backups(self):
async def purge_backups(self):
try:
stub = medusa_pb2_grpc.MedusaStub(self.channel)
request = medusa_pb2.PurgeBackupsRequest()
resp = stub.PurgeBackups(request)
resp = await stub.PurgeBackups(request)
return resp
except grpc.RpcError as e:
logging.error("Failed to purge backups due to error: {}".format(e))
Expand Down
51 changes: 32 additions & 19 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import json
import logging
import os
import signal
import sys
from collections import defaultdict
from concurrent import futures
Expand All @@ -26,6 +26,7 @@

import grpc
import grpc_health.v1.health
from grpc import aio
from grpc_health.v1 import health_pb2_grpc

from medusa import backup_node
Expand All @@ -38,7 +39,6 @@
from medusa.service.grpc import medusa_pb2
from medusa.service.grpc import medusa_pb2_grpc
from medusa.storage import Storage
from medusa.storage.abstract_storage import AbstractStorage

TIMESTAMP_FORMAT = '%Y-%m-%d %H:%M:%S'
BACKUP_MODE_DIFFERENTIAL = "differential"
Expand All @@ -52,7 +52,7 @@
self.config_file_path = config_file_path
self.medusa_config = self.create_config()
self.testing = testing
self.grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=[
self.grpc_server = aio.server(futures.ThreadPoolExecutor(max_workers=10), options=[
('grpc.max_send_message_length', self.medusa_config.grpc.max_send_message_length),
('grpc.max_receive_message_length', self.medusa_config.grpc.max_receive_message_length)
])
Expand All @@ -61,9 +61,9 @@
def shutdown(self, signum, frame):
logging.info("Shutting down GRPC server")
handle_backup_removal_all()
self.grpc_server.stop(0)
asyncio.get_event_loop().run_until_complete(self.grpc_server.stop(0))

def serve(self):
async def serve(self):
config = self.create_config()
self.configure_console_logging()

Expand All @@ -72,11 +72,14 @@

logging.info('Starting server. Listening on port 50051.')
self.grpc_server.add_insecure_port('[::]:50051')
self.grpc_server.start()
await self.grpc_server.start()

if not self.testing:
signal.signal(signal.SIGTERM, self.shutdown)
self.grpc_server.wait_for_termination()
try:
await self.grpc_server.wait_for_termination()
except asyncio.exceptions.CancelledError:
logging.info("Swallowing asyncio.exceptions.CancelledError. This should get fixed at some point")
handle_backup_removal_all()

Check warning on line 82 in medusa/service/grpc/server.py

View check run for this annotation

Codecov / codecov/patch

medusa/service/grpc/server.py#L78-L82

Added lines #L78 - L82 were not covered by tests

def create_config(self):
config_file = Path(self.config_file_path)
Expand All @@ -88,6 +91,10 @@
root_logger = logging.getLogger('')
root_logger.setLevel(logging.DEBUG)

# Clean up handlers on the root_logger, this prevents duplicate log lines
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)

log_format = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')

console_handler = logging.StreamHandler()
Expand All @@ -108,10 +115,9 @@
self.config = config
self.storage_config = config.storage

def AsyncBackup(self, request, context):
async def AsyncBackup(self, request, context):
# TODO pass the staggered arg
logging.info("Performing ASYNC backup {} (type={})".format(request.name, request.mode))
loop = AbstractStorage.get_or_create_event_loop()
response = medusa_pb2.BackupResponse()
mode = BACKUP_MODE_DIFFERENTIAL
if medusa_pb2.BackupRequest.Mode.FULL == request.mode:
Expand All @@ -120,13 +126,16 @@
try:
response.backupName = request.name
response.status = response.status = medusa_pb2.StatusType.IN_PROGRESS
with ThreadPoolExecutor(max_workers=1, thread_name_prefix=request.name) as executor:
BackupMan.register_backup(request.name, is_async=True)
backup_future = loop.run_in_executor(executor, backup_node.handle_backup, self.config,
request.name, None,
False, mode)
backup_future.add_done_callback(record_backup_info)
BackupMan.set_backup_future(request.name, backup_future)
BackupMan.register_backup(request.name, is_async=True)
executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix=request.name)
loop = asyncio.get_running_loop()
backup_future = loop.run_in_executor(
executor,
backup_node.handle_backup,
self.config, request.name, None, False, mode
)
backup_future.add_done_callback(record_backup_info)
BackupMan.set_backup_future(request.name, backup_future)

except Exception as e:

Expand Down Expand Up @@ -382,11 +391,15 @@
logging.error("Failed to cleanup all backups")


if __name__ == '__main__':
async def main():
if len(sys.argv) > 2:
config_file_path = sys.argv[2]
else:
config_file_path = "/etc/medusa/medusa.ini"

server = Server(config_file_path)
server.serve()
await server.serve()

Check warning on line 401 in medusa/service/grpc/server.py

View check run for this annotation

Codecov / codecov/patch

medusa/service/grpc/server.py#L401

Added line #L401 was not covered by tests


if __name__ == '__main__':
asyncio.run(main())
4 changes: 4 additions & 0 deletions tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ Feature: Integration tests
When I load 100 rows in the "medusa.test" table
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_23"
Then I wait for the async backup "grpc_backup_23" to finish
Then the backup index exists
Then I verify over gRPC that the backup "grpc_backup_23" exists and is of type "differential"
Then I can see the backup index entry for "grpc_backup_23"
Expand Down Expand Up @@ -1038,6 +1039,7 @@ Feature: Integration tests
When I run a "ccm node1 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I run a "ccm node2 nodetool -- -Dcom.sun.jndi.rmiURLParsing=legacy flush" command
When I perform an async backup over gRPC in "differential" mode of the node named "grpc_backup_28"
Then I wait for the async backup "grpc_backup_28" to finish
Then the backup index exists
Then I verify over gRPC that the backup "grpc_backup_28" exists and is of type "differential"
# The backup status is not actually a SUCCESS because the node2 has not been backed up.
Expand Down Expand Up @@ -1066,12 +1068,14 @@ Feature: Integration tests
When I run a DSE "nodetool flush" command
Then I can make a search query against the "medusa"."test" table
When I perform an async backup over gRPC in "differential" mode of the node named "backup29-1"
Then I wait for the async backup "backup29-1" to finish
Then the backup index exists
Then I can see the backup named "backup29-1" when I list the backups
And the backup "backup29-1" has server_type "dse" in its metadata
Then I verify over gRPC that the backup "backup29-1" exists and is of type "differential"
Then I verify over gRPC that the backup "backup29-1" has expected status SUCCESS
When I perform an async backup over gRPC in "differential" mode of the node named "backup29-2"
Then I wait for the async backup "backup29-2" to finish
Then the backup index exists
Then I can see the backup named "backup29-2" when I list the backups
And the backup "backup29-2" has server_type "dse" in its metadata
Expand Down
Loading
Loading