Skip to content

Commit

Permalink
Expose sum of backup object count and size via gRPC methods (#686)
Browse files Browse the repository at this point in the history
* Expose sum of backup object count and size via gRPC methods

* Fix various issues with ITs and grpc

* Use k8ssandra-operator main branch in ITs

* Simplify gRPC way of working out backup statuses

* Use k8ssandra-operator compatible with changes in this PR

* Build the k8s cluster with the checked out k8ssandra operator

* Do not return a data structure if we modify it

* Properly read k8s enablement setting

* Switch ITs to use k8ssandra-operator main
  • Loading branch information
rzvoncek authored Nov 30, 2023
1 parent 4908194 commit 4ddaeeb
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 70 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,7 @@ jobs:
- uses: actions/checkout@v3
with:
repository: k8ssandra/k8ssandra-operator
# Modify this line to use a different branch when making changes to the operator affecting Medusa
# Change to main once the operator changes are merged
ref: add-medusa-non-regression-test
ref: main
path: k8ssandra-operator
- name: Set up Go
uses: actions/setup-go@v3
Expand Down Expand Up @@ -377,7 +375,7 @@ jobs:
- name: Setup kind cluster
working-directory: k8ssandra-operator
run: |
make single-create
make single-up
- name: Run e2e test ( ${{ matrix.e2e_test }} )
working-directory: k8ssandra-operator
run: |
Expand Down
9 changes: 9 additions & 0 deletions medusa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ def parse_config(args, config_file):
if value is not None
}})

# the k8s mode and grpc server overlap in a keyword 'enabled'
# so we need to reconcile them explicitly
k8s_enabled = evaluate_boolean(config['kubernetes']['enabled'])
if args.get('k8s_enabled', 'False') == 'True' or k8s_enabled:
config.set('kubernetes', 'enabled', 'True')
grpc_enabled = evaluate_boolean(config['grpc']['enabled'])
if args.get('grpc_enabled', "False") == 'True' or grpc_enabled:
config.set('grpc', 'enabled', 'True')

if evaluate_boolean(config['kubernetes']['enabled']):
if evaluate_boolean(config['cassandra']['use_sudo']):
logging.warning('Forcing use_sudo to False because Kubernetes mode is enabled')
Expand Down
8 changes: 5 additions & 3 deletions medusa/service/grpc/medusa.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ message BackupRequest {

message BackupResponse {
string backupName = 1;
StatusType status = 2;
StatusType status = 2;
}

message BackupStatusRequest {
Expand Down Expand Up @@ -82,8 +82,10 @@ message BackupSummary {
int32 totalNodes = 4;
int32 finishedNodes = 5;
repeated BackupNode nodes = 6;
StatusType status = 7;
string backupType = 8;
StatusType status = 7;
string backupType = 8;
int64 totalSize = 9;
int64 totalObjects = 10;
}

message BackupNode {
Expand Down
32 changes: 16 additions & 16 deletions medusa/service/grpc/medusa_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 36 additions & 15 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,18 @@ def BackupStatus(self, request, context):
response = medusa_pb2.BackupStatusResponse()
try:
with Storage(config=self.storage_config) as storage:

# find the backup
backup = storage.get_node_backup(fqdn=storage.config.fqdn, name=request.backupName)
if backup.started is None:
raise KeyError

# work out the timings
response.startTime = datetime.fromtimestamp(backup.started).strftime(TIMESTAMP_FORMAT)
if backup.finished:
response.finishTime = datetime.fromtimestamp(backup.finished).strftime(TIMESTAMP_FORMAT)
else:
response.finishTime = ""

# record the status
record_status_in_response(response, request.backupName)

except KeyError:
context.set_details("backup <{}> does not exist".format(request.backupName))
context.set_code(grpc.StatusCode.NOT_FOUND)
Expand All @@ -192,13 +191,12 @@ def BackupStatus(self, request, context):

def GetBackup(self, request, context):
response = medusa_pb2.GetBackupResponse()
last_status = medusa_pb2.StatusType.UNKNOWN
try:
with Storage(config=self.storage_config) as connected_storage:
backup = connected_storage.get_cluster_backup(request.backupName)
summary, response.status = get_backup_summary(backup, last_status)
summary = get_backup_summary(backup)
response.backup.CopyFrom(summary)
record_status_in_response(response, request.backupName)
response.status = summary.status
except Exception as e:
context.set_details("Failed to get backup due to error: {}".format(e))
context.set_code(grpc.StatusCode.INTERNAL)
Expand All @@ -208,14 +206,14 @@ def GetBackup(self, request, context):

def GetBackups(self, request, context):
response = medusa_pb2.GetBackupsResponse()
last_status = medusa_pb2.StatusType.UNKNOWN
try:
# cluster backups
with Storage(config=self.storage_config) as connected_storage:
backups = get_backups(connected_storage, self.config, True)
for backup in backups:
summary, last_status = get_backup_summary(backup, last_status)
summary = get_backup_summary(backup)
response.backups.append(summary)
set_overall_status(response)

except Exception as e:
context.set_details("Failed to get backups due to error: {}".format(e))
Expand Down Expand Up @@ -282,27 +280,50 @@ def PrepareRestore(self, request, context):
return response


def get_backup_summary(backup, last_status):
def set_overall_status(get_backups_response):
get_backups_response.overallStatus = medusa_pb2.StatusType.UNKNOWN
backups = get_backups_response.backups
if len(backups) == 0:
return
if all(backup.status == medusa_pb2.StatusType.SUCCESS for backup in backups):
get_backups_response.overallStatus = medusa_pb2.StatusType.SUCCESS
if any(backup.status == medusa_pb2.StatusType.IN_PROGRESS for backup in backups):
get_backups_response.overallStatus = medusa_pb2.StatusType.IN_PROGRESS
if any(backup.status == medusa_pb2.StatusType.FAILED for backup in backups):
get_backups_response.overallStatus = medusa_pb2.StatusType.FAILED
if any(backup.status == medusa_pb2.StatusType.UNKNOWN for backup in backups):
get_backups_response.overallStatus = medusa_pb2.StatusType.UNKNOWN


def get_backup_summary(backup):
summary = medusa_pb2.BackupSummary()

summary.backupName = backup.name

if backup.started is None:
summary.startTime = 0
else:
summary.startTime = 1234
summary.startTime = backup.started

if backup.finished is None:
summary.finishTime = 0
summary.status = medusa_pb2.StatusType.IN_PROGRESS
last_status = medusa_pb2.StatusType.IN_PROGRESS
else:
summary.finishTime = backup.finished
if last_status != medusa_pb2.StatusType.IN_PROGRESS:
summary.status = medusa_pb2.StatusType.SUCCESS
summary.status = medusa_pb2.StatusType.SUCCESS

summary.totalNodes = len(backup.tokenmap)
summary.finishedNodes = len(backup.complete_nodes())

for node in backup.tokenmap:
summary.nodes.append(create_token_map_node(backup, node))

summary.backupType = backup.backup_type
return summary, last_status

summary.totalSize = backup.size()
summary.totalObjects = backup.num_objects()

return summary


# Callback function for recording unique backup results
Expand Down
14 changes: 14 additions & 0 deletions tests/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ def test_use_sudo_kubernetes_enabled(self):
config = medusa.config.parse_config(args, medusa_k8s_config)
assert config['cassandra']['use_sudo'] == 'False'

def test_use_sudo_kubernetes_enabled_without_config_file(self):
kubernetes_args = {
"k8s_enabled": 'True',
"cassandra_url": 'https://foo:8080',
"use_mgmt_api": 'True'
}
args = {**kubernetes_args}
medusa_basic_config = pathlib.Path(__file__).parent / "resources/config/medusa.ini"
config = medusa.config.parse_config(args, medusa_basic_config)
assert config['kubernetes']['enabled'] == 'True'
assert config['kubernetes']['cassandra_url'] == 'https://foo:8080'
assert config['kubernetes']['use_mgmt_api'] == 'True'
assert config['cassandra']['use_sudo'] == 'False'

def test_overridden_fqdn(self):
"""Ensure that a overridden fqdn in config is honored"""
args = {'fqdn': 'overridden-fqdn'}
Expand Down
Loading

0 comments on commit 4ddaeeb

Please # to comment.