diff --git a/CHANGES.next.md b/CHANGES.next.md index dc44a90cb7..4338af1008 100644 --- a/CHANGES.next.md +++ b/CHANGES.next.md @@ -49,6 +49,7 @@ - Add pd extreme support to PKB. - Add '--delete_samples' to measure VM deletion during benchmark teardown phase +- Add cURL benchmark for object storage. ### Enhancements: diff --git a/perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py b/perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py index 17af5b13da..b75f80c6a4 100644 --- a/perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py @@ -191,8 +191,8 @@ def Prepare(benchmark_spec, vm=None): location = benchmark_spec.tpu_groups['train'].GetZone() storage_service.PrepareService(util.GetRegionFromZone(location)) storage_service.MakeBucket(bucket) - storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', - bucket) + storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER, + bucket) # For MLPerf v0.6, the benchmake code of different hardware are different. if (benchmark_spec.tpu_groups['train'].GetAcceleratorType() == 'v3-32' or diff --git a/perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py b/perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py index 4e72d07a5a..0d6a7bf630 100644 --- a/perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/mlperf_multiworkers_benchmark.py @@ -689,7 +689,8 @@ def _PrepareBucket(benchmark_spec): storage_service = benchmark_spec.storage_service storage_service.PrepareService(util.GetRegionFromZone(location)) storage_service.MakeBucket(bucket, raise_on_failure=False) - storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket) + storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER, + bucket) def _ClearTmpDirectory(benchmark_spec, vm): diff --git a/perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py b/perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py index 5723f5e659..6455d76e06 100644 --- a/perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py @@ -141,7 +141,8 @@ def Prepare(benchmark_spec): location = benchmark_spec.tpu_groups['train'].GetZone() storage_service.PrepareService(util.GetRegionFromZone(location)) storage_service.MakeBucket(bucket) - storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket) + storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER, + bucket) else: benchmark_spec.model_dir = '/tmp' diff --git a/perfkitbenchmarker/linux_benchmarks/object_storage_curl.py b/perfkitbenchmarker/linux_benchmarks/object_storage_curl.py new file mode 100644 index 0000000000..d710f36642 --- /dev/null +++ b/perfkitbenchmarker/linux_benchmarks/object_storage_curl.py @@ -0,0 +1,276 @@ +# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Use cURL to upload and download data to object storage in parallel. + +Consistent with object_storage_service multistream scenario. + +Due to the difficulty of signing to requests to S3 by hand +(https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html). +The benchmark uses insecure short lived buckets and should be used with caution. + +TODO(pclay): Consider signing requests and not using public buckets. +""" + +import logging +from typing import List, Tuple + +from absl import flags +import numpy as np +from perfkitbenchmarker import configs +from perfkitbenchmarker import errors +from perfkitbenchmarker import flag_util +from perfkitbenchmarker import object_storage_service +from perfkitbenchmarker import providers +from perfkitbenchmarker import sample +from perfkitbenchmarker import vm_util +from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark + +BENCHMARK_NAME = 'object_storage_curl' + +BENCHMARK_CONFIG = """ +object_storage_curl: + description: Use cURL to upload and download data to object storage in parallel. + vm_groups: + default: + vm_spec: *default_single_core + flags: + # Required + object_storage_multistream_objects_per_stream: 1 + object_storage_streams_per_vm: 10 +""" + +# Blocksize for dd to pipe data into uploads. +DD_BLOCKSIZE = 4000 + +# Magic strings +_UPLOAD = 'upload' +_DOWNLOAD = 'download' +_START_TIME = 'START_TIME' +_CURL_RESULTS = 'CURL_RESULTS' + +flags.DEFINE_string('object_storage_curl_object_size', '1MB', + 'Size of objects to upload / download. Similar to ' + '--object_storage_object_sizes, but only takes a single ' + 'size.') +flags.DEFINE_bool('object_storage_curl_i_am_ok_with_public_read_write_buckets', + False, 'Acknowledge that this bucket will create buckets ' + 'which are publicly readable and writable. Required to run ' + 'this benchmark.') + +FLAGS = flags.FLAGS + + +def GetConfig(user_config): + return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) + + +def CheckPrerequisites(_): + """Validate some unsupported flags.""" + if (flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) < + DD_BLOCKSIZE): + raise errors.Config.InvalidValue( + '--object_storage_curl_object_size must be larger than 4KB') + # TODO(pclay): Consider supporting multiple objects per stream. + if FLAGS.object_storage_multistream_objects_per_stream != 1: + raise errors.Config.InvalidValue( + 'object_storage_curl only supports 1 object per stream') + if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream': + raise errors.Config.InvalidValue( + 'object_storage_curl only supports sequential_by_stream naming.') + if not FLAGS.object_storage_curl_i_am_ok_with_public_read_write_buckets: + raise errors.Config.InvalidValue( + 'This benchmark uses public read/write object storage bucket.\n' + 'You must explicitly pass ' + '--object_storage_curl_i_am_ok_with_public_read_write_buckets to ' + 'acknowledge that it will be created.\n' + 'If PKB is interrupted, you should ensure it is cleaned up.') + + +# PyType does not currently support returning Abstract classes +# TODO(user): stop suppressing +# pytype: disable=not-instantiable +def _GetService() -> object_storage_service.ObjectStorageService: + """Get a ready to use instance of ObjectStorageService.""" + # TODO(pclay): consider using FLAGS.storage to allow cross cloud testing? + cloud = FLAGS.cloud + providers.LoadProvider(cloud) + service = object_storage_service.GetObjectStorageClass(cloud)() + # This method is idempotent with default args and safe to call in each phase. + service.PrepareService(FLAGS.object_storage_region) + return service + + +def _GetBucketName() -> str: + return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri + + +def Prepare(benchmark_spec): + """Create and ACL bucket and install curl.""" + # We would like to always cleanup server side states when exception happens. + benchmark_spec.always_call_cleanup = True + + service = _GetService() + bucket_name = _GetBucketName() + + service.MakeBucket(bucket_name) + service.MakeBucketPubliclyReadable(bucket_name, also_make_writable=True) + + vms = benchmark_spec.vms + vm_util.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms) + + +def Run(benchmark_spec) -> List[sample.Sample]: + """Run storage benchmark and publish results. + + Args: + benchmark_spec: The benchmark specification. Contains all data that is + required to run the benchmark. + + Returns: + The same samples as object_storage_multistream. + """ + service = _GetService() + bucket = _GetBucketName() + + object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) + blocks = object_bytes // DD_BLOCKSIZE + streams_per_vm = FLAGS.object_storage_streams_per_vm + + generate_data_cmd = ( + 'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero' + f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock') + def StartTimeCmd(index): + return f"date '+{_START_TIME} {index} %s.%N'" + + def CurlCmd(operation, index): + return ( + f"curl -fsw '{_CURL_RESULTS} {index} %{{time_total}} " + # Pad size with zero to force curl to populate it. + f"%{{response_code}} 0%{{size_{operation}}}\n' -o /dev/null") + + def Upload(vm): + commands = [] + for object_index in range(streams_per_vm): + object_name = f'{vm.name}_{object_index}' + url = service.GetUploadUrl(bucket=bucket, object_name=object_name) + commands.append( + f'{StartTimeCmd(object_index)}; {generate_data_cmd} | ' + f'{CurlCmd(_UPLOAD, object_index)} -X {service.UPLOAD_HTTP_METHOD} ' + f"--data-binary @- '{url}'") + stdout, _ = vm.RemoteCommandsInParallel(commands) + return stdout + + def Download(vm): + commands = [] + for object_index in range(streams_per_vm): + object_name = f'{vm.name}_{object_index}' + url = service.GetDownloadUrl(bucket=bucket, object_name=object_name) + commands.append(f'{StartTimeCmd(object_index)}; ' + f"{CurlCmd(_DOWNLOAD, object_index)} '{url}'") + stdout, _ = vm.RemoteCommandsInParallel(commands) + return stdout + + vms = benchmark_spec.vms + samples = [] + for operation, func in [(_UPLOAD, Upload), (_DOWNLOAD, Download)]: + output = vm_util.RunThreaded(func, vms) + start_times, latencies = _LoadWorkerOutput(output) + object_storage_service_benchmark.ProcessMultiStreamResults( + start_times, + latencies, + all_sizes=[object_bytes], + sizes=[np.array([object_bytes])] * streams_per_vm * len(vms), + operation=operation, + results=samples, + # We do not retry curl. We simply do not report failing latencies. + # This under-reports both latency and throughput. Since this benchmark + # is intended to measure throughput this is reasonable. + allow_failing_streams=True) + return samples + + +def Cleanup(_): + service = _GetService() + bucket_name = _GetBucketName() + if not FLAGS.object_storage_dont_delete_bucket: + service.DeleteBucket(bucket_name) + service.CleanupService() + + +def _LoadWorkerOutput( + output: List[str]) -> Tuple[List[np.ndarray], List[np.ndarray]]: + """Parse the output of Upload and Download functions. + + The output of Upload and Download is + # START_TIME + START_TIME 0 12345.6789 + # CURL_RESULTS + CURL_RESULTS 0 1.2345 200 01000 + + Lines of output are not ordered and may be interleaved. + + Args: + output: the output of each upload or download command + + Returns: + the start times and latencies of the curl commands + + Raises: + ValueError if output is unexpected. + Exception if the curl request failed with a 4XX code. + """ + start_times = [] + latencies = [] + + for worker_out in output: + worker_start_times = [None] * FLAGS.object_storage_streams_per_vm + worker_latencies = [None] * FLAGS.object_storage_streams_per_vm + for line in worker_out.strip().split('\n'): + try: + line_type, index, value, *curl_data = line.split() + if line_type == _START_TIME: + assert not curl_data + worker_start_times[int(index)] = float(value) + elif line_type == _CURL_RESULTS: + response_code, bytes_transmitted = curl_data + bytes_transmitted = int(bytes_transmitted) + if response_code.startswith('4'): + raise Exception( + f'cURL command failed with HTTP Code {response_code}') + elif response_code == '200': + bytes_expected = flag_util.StringToBytes( + FLAGS.object_storage_curl_object_size) + if bytes_transmitted != bytes_expected: + raise ValueError( + f'cURL transmitted {bytes_transmitted}' + f' instead of {bytes_expected}.') + # curl 7.74 used μs instead of seconds. Not used in major OS types. + # https://github.com/curl/curl/issues/6321 + assert '.' in value, 'Invalid curl output.' + worker_latencies[int(index)] = float(value) + else: + logging.warning('cURL command failed with HTTP code %s. ' + 'Not reporting latency.', response_code) + else: + raise ValueError(f'Unexpected line start: {line_type}.') + # Always show raw line when there is a parsing error. + except (ValueError, AssertionError) as e: + raise ValueError(f'Unexpected output:\n{line}') from e + + for start_time, latency in zip(worker_start_times, worker_latencies): + if latency: + start_times.append(np.array([start_time], dtype=np.float64)) + latencies.append(np.array([latency], dtype=np.float64)) + + return start_times, latencies diff --git a/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py b/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py index ec2ac930d8..3406fc3087 100644 --- a/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py @@ -403,8 +403,14 @@ def MultiThreadDeleteDelay(num_vms, threads_per_vm): return (num_vms * threads_per_vm) / (MULTISTREAM_DELETE_OPS_PER_SEC) -def _ProcessMultiStreamResults(start_times, latencies, sizes, operation, - all_sizes, results, metadata=None): +def ProcessMultiStreamResults(start_times, + latencies, + sizes, + operation, + all_sizes, + results, + metadata=None, + allow_failing_streams=False): """Read and process results from the api_multistream worker process. Results will be reported per-object size and combined for all @@ -421,17 +427,24 @@ def _ProcessMultiStreamResults(start_times, latencies, sizes, operation, distribution used, in bytes. results: a list to append Sample objects to. metadata: dict. Base sample metadata + allow_failing_streams: Whether to expect a result for all streams. """ - num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms - - assert len(start_times) == num_streams - assert len(latencies) == num_streams - assert len(sizes) == num_streams + total_num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms + if allow_failing_streams: + num_streams = len(start_times) + assert len(latencies) == num_streams + assert len(sizes) == num_streams + else: + assert len(start_times) == total_num_streams + assert len(latencies) == total_num_streams + assert len(sizes) == total_num_streams + num_streams = total_num_streams if metadata is None: metadata = {} - metadata['num_streams'] = num_streams + metadata['num_streams'] = total_num_streams + metadata['num_failing_streams'] = total_num_streams - num_streams metadata['objects_per_stream'] = ( FLAGS.object_storage_multistream_objects_per_stream) metadata['object_naming'] = FLAGS.object_storage_object_naming_scheme @@ -1053,7 +1066,7 @@ def _MultiStreamOneWay(results, metadata, vms, command_builder, if FLAGS.object_storage_worker_output: with open(FLAGS.object_storage_worker_output, 'w') as out_file: out_file.write(json.dumps(output)) - _ProcessMultiStreamResults( + ProcessMultiStreamResults( start_times, latencies, sizes, diff --git a/perfkitbenchmarker/linux_virtual_machine.py b/perfkitbenchmarker/linux_virtual_machine.py index 6eaf2de12c..24fa089bd6 100644 --- a/perfkitbenchmarker/linux_virtual_machine.py +++ b/perfkitbenchmarker/linux_virtual_machine.py @@ -36,7 +36,7 @@ import re import threading import time -from typing import Dict, Set +from typing import Dict, List, Set import uuid from absl import flags @@ -937,6 +937,36 @@ def RemoteCommand(self, *args, **kwargs): """ return self.RemoteCommandWithReturnCode(*args, **kwargs)[:2] + def RemoteCommandsInParallel(self, + commands: List[str], + *args, + **kwargs): + """Runs several commands on the VM in parallel. + + Because of the behavior of the wait command. This ignores failure on all + commands. + + Args: + commands: the list of commands to run + *args: Arguments passed directly to RemoteHostCommandWithReturnCode. + **kwargs: Keyword arguments passed directly to + RemoteHostCommandWithReturnCode. + + Returns: + A tuple of stdout, stderr from running the commands. Both will be + interleaved from all the commands in no guaranteed order. + + Raises: + RemoteCommandError: If there was a problem establishing the connection. + """ + command_delimiter = ' & ' + # Wrap each command in a subshell to support command chaining .e.g + # (echo do this; echo then this) & (echo do this in parallel) + command = command_delimiter.join(f'( {command} )' for command in commands) + command += '; wait' + + return self.RobustRemoteCommand(command, *args, **kwargs) + def RemoteCommandWithReturnCode(self, *args, **kwargs): """Runs a command on the VM. diff --git a/perfkitbenchmarker/object_storage_service.py b/perfkitbenchmarker/object_storage_service.py index e3af30ddf2..6de2554f0f 100644 --- a/perfkitbenchmarker/object_storage_service.py +++ b/perfkitbenchmarker/object_storage_service.py @@ -18,6 +18,7 @@ import abc import logging import os +from typing import Optional from absl import flags from perfkitbenchmarker import errors @@ -273,10 +274,46 @@ def UpdateSampleMetadata(self, samples): Args: samples: the samples that need the metadata to be updated with provider - specific information. + specific information. """ pass + def GetDownloadUrl(self, + bucket: str, + object_name: str, + use_https=True) -> str: + """Get the URL to download objects over HTTP(S). + + Args: + bucket: name of bucket + object_name: name of object + use_https: whether to use HTTPS or else HTTP + + Returns: + The URL to download objects over. + """ + raise NotImplementedError + + def GetUploadUrl(self, bucket: str, object_name: str, use_https=True) -> str: + """Get the URL to upload objects over HTTP(S). + + Args: + bucket: name of bucket + object_name: name of object + use_https: whether to use HTTPS or else HTTP + + Returns: + The URL to upload objects over. + """ + return self.GetDownloadUrl(bucket, object_name, use_https) + + # Different services require uploads to be POST or PUT. + UPLOAD_HTTP_METHOD: Optional[str] = None + + def MakeBucketPubliclyReadable(self, bucket: str, also_make_writable=False): + """Make a bucket readable and optionally writable by everyone.""" + raise NotImplementedError + def APIScriptArgs(self): """Extra arguments for the API test script. diff --git a/perfkitbenchmarker/providers/aws/s3.py b/perfkitbenchmarker/providers/aws/s3.py index 1002214a20..43bad2cf72 100644 --- a/perfkitbenchmarker/providers/aws/s3.py +++ b/perfkitbenchmarker/providers/aws/s3.py @@ -14,10 +14,13 @@ """Contains classes/functions related to S3.""" +import json import os import posixpath +from typing import List from absl import flags +from absl import logging from perfkitbenchmarker import errors from perfkitbenchmarker import linux_packages from perfkitbenchmarker import object_storage_service @@ -29,6 +32,8 @@ AWS_CREDENTIAL_LOCATION = '.aws' DEFAULT_AWS_REGION = 'us-east-1' +_READ = 's3:GetObject' +_WRITE = 's3:PutObject' class S3Service(object_storage_service.ObjectStorageService): @@ -119,7 +124,7 @@ def ListTopLevelSubfolders(self, bucket): def DeleteBucket(self, bucket): """See base class.""" - def _suppress_failure(stdout, stderr, retcode): + def _SuppressFailure(stdout, stderr, retcode): """Suppresses failure when bucket does not exist.""" del stdout # unused if retcode and 'NoSuchBucket' in stderr: @@ -131,7 +136,7 @@ def _suppress_failure(stdout, stderr, retcode): 's3://%s' % bucket, '--region', self.region, '--force'], # --force deletes even if bucket contains objects. - suppress_failure=_suppress_failure) + suppress_failure=_SuppressFailure) def EmptyBucket(self, bucket): vm_util.IssueCommand( @@ -140,6 +145,27 @@ def EmptyBucket(self, bucket): '--region', self.region, '--recursive']) + def MakeBucketPubliclyReadable(self, bucket, also_make_writable=False): + """See base class.""" + actions = [_READ] + logging.warning('Making bucket %s publicly readable!', bucket) + if also_make_writable: + actions.append(_WRITE) + logging.warning('Making bucket %s publicly writable!', bucket) + vm_util.IssueCommand([ + 'aws', 's3api', 'put-bucket-policy', '--region', self.region, + '--bucket', bucket, '--policy', + _MakeS3BucketPolicy(bucket, actions) + ]) + + def GetDownloadUrl(self, bucket, object_name, use_https=True): + """See base class.""" + assert self.region + scheme = 'https' if use_https else 'http' + return f'{scheme}://{bucket}.s3.{self.region}.amazonaws.com/{object_name}' + + UPLOAD_HTTP_METHOD = 'PUT' + def PrepareVM(self, vm): vm.Install('awscli') vm.Install('boto3') @@ -174,3 +200,20 @@ def APIScriptArgs(self): @classmethod def APIScriptFiles(cls): return ['s3.py'] + + +def _MakeS3BucketPolicy(bucket: str, + actions: List[str], + object_prefix='') -> str: + # https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_examples_s3_rw-bucket.html + return json.dumps({ + 'Version': + '2012-10-17', + 'Statement': [{ + 'Principal': '*', + 'Sid': 'PkbAcl', + 'Effect': 'Allow', + 'Action': actions, + 'Resource': [f'arn:aws:s3:::{bucket}/{object_prefix}*'] + }] + }) diff --git a/perfkitbenchmarker/providers/gcp/gcs.py b/perfkitbenchmarker/providers/gcp/gcs.py index 08b5c9f5e9..7d645b2b64 100644 --- a/perfkitbenchmarker/providers/gcp/gcs.py +++ b/perfkitbenchmarker/providers/gcp/gcs.py @@ -19,6 +19,7 @@ import os import posixpath import re +from typing import List as TList from absl import flags from perfkitbenchmarker import errors @@ -35,6 +36,8 @@ GCLOUD_CONFIG_PATH = '.config/gcloud' GCS_CLIENT_PYTHON = 'python' GCS_CLIENT_BOTO = 'boto' +READER = 'objectViewer' +WRITER = 'objectCreator' flags.DEFINE_string('google_cloud_sdk_version', None, 'Use a particular version of the Google Cloud SDK, e.g.: ' @@ -155,18 +158,44 @@ def EmptyBucket(self, bucket): ['gsutil', '-m', 'rm', '-r', 'gs://%s/*' % bucket], raise_on_failure=False) - def ChmodBucket(self, account, access, bucket): + def AclBucket(self, entity: str, roles: TList[str], bucket: str): """Updates access control lists. Args: - account: string, the user to be granted. - access: string, the permission to be granted. - bucket: string, the name of the bucket to change + entity: the user or group to grant permission. + roles: the IAM roles to be granted. + bucket: the name of the bucket to change """ vm_util.IssueCommand([ - 'gsutil', 'acl', 'ch', '-u', - '{account}:{access}'.format(account=account, access=access), - 'gs://{}'.format(bucket)]) + 'gsutil', 'iam', 'ch', f"{entity}:{','.join(roles)}", f'gs://{bucket}' + ]) + + def MakeBucketPubliclyReadable(self, bucket, also_make_writable=False): + """See base class.""" + roles = [READER] + logging.warning('Making bucket %s publicly readable!', bucket) + if also_make_writable: + roles.append(WRITER) + logging.warning('Making bucket %s publicly writable!', bucket) + self.AclBucket('allUsers', roles, bucket) + + # Use JSON API over XML for URLs + def GetDownloadUrl(self, bucket, object_name, use_https=True): + """See base class.""" + # https://cloud.google.com/storage/docs/downloading-objects + scheme = 'https' if use_https else 'http' + return (f'{scheme}://storage.googleapis.com/storage/v1/' + f'b/{bucket}/o/{object_name}?alt=media') + + def GetUploadUrl(self, bucket, object_name, use_https=True): + """See base class.""" + # https://cloud.google.com/storage/docs/uploading-objects + # Note I don't believe GCS supports upload via HTTP. + scheme = 'https' if use_https else 'http' + return (f'{scheme}://storage.googleapis.com/upload/storage/v1/' + f'b/{bucket}/o?uploadType=media&name={object_name}') + + UPLOAD_HTTP_METHOD = 'POST' @classmethod def AcquireWritePermissionsWindows(cls, vm): diff --git a/tests/linux_benchmarks/object_storage_service_benchmark_test.py b/tests/linux_benchmarks/object_storage_service_benchmark_test.py index c51b27e683..49a016e2d8 100644 --- a/tests/linux_benchmarks/object_storage_service_benchmark_test.py +++ b/tests/linux_benchmarks/object_storage_service_benchmark_test.py @@ -19,6 +19,7 @@ import unittest from absl import flags import mock +import numpy as np from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark from tests import pkb_common_test_case @@ -45,7 +46,7 @@ def testBuildCommands(self): with mock.patch(time.__name__ + '.time', return_value=1.0): with mock.patch(object_storage_service_benchmark.__name__ + - '._ProcessMultiStreamResults'): + '.ProcessMultiStreamResults'): with mock.patch(object_storage_service_benchmark.__name__ + '.LoadWorkerOutput', return_value=(None, None, None)): object_storage_service_benchmark.MultiStreamRWBenchmark( @@ -135,5 +136,38 @@ def testFilename(self): self.assertLessEqual(age, 73) +class TestProcessMultiStreamResults(pkb_common_test_case.PkbCommonTestCase): + + def setUp(self): + super(TestProcessMultiStreamResults, self).setUp() + FLAGS.object_storage_streams_per_vm = 2 + FLAGS.num_vms = 2 + FLAGS.object_storage_multistream_objects_per_stream = 2 + + def testDefault(self): + start_times = [np.array([6, 5], dtype=np.float64)] * 4 + latencies = [np.array([4, 3], dtype=np.float64)] * 4 + sizes = [np.array([2, 1])] * 4 + all_sizes = [1, 2] + results = [] + object_storage_service_benchmark.ProcessMultiStreamResults( + start_times, latencies, sizes, 'download', all_sizes, results) + + def testAllowFailing(self): + start_times = [np.array([6, 5], dtype=np.float64)] * 3 + latencies = [np.array([4, 3], dtype=np.float64)] * 3 + sizes = [np.array([2, 1])] * 3 + all_sizes = [1, 2] + results = [] + object_storage_service_benchmark.ProcessMultiStreamResults( + start_times, + latencies, + sizes, + 'download', + all_sizes, + results, + allow_failing_streams=True) + + if __name__ == '__main__': unittest.main()