Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add RunRemoteCommandsInParallel and use it to run curl in parallel to avoid exhausting SSH conntections. #3075

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.next.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
- Add pd extreme support to PKB.
- Add '--delete_samples' to measure VM deletion during benchmark teardown
phase
- Add cURL benchmark for object storage.

### Enhancements:

Expand Down
4 changes: 2 additions & 2 deletions perfkitbenchmarker/linux_benchmarks/mlperf_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ def Prepare(benchmark_spec, vm=None):
location = benchmark_spec.tpu_groups['train'].GetZone()
storage_service.PrepareService(util.GetRegionFromZone(location))
storage_service.MakeBucket(bucket)
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W',
bucket)
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
bucket)

# For MLPerf v0.6, the benchmake code of different hardware are different.
if (benchmark_spec.tpu_groups['train'].GetAcceleratorType() == 'v3-32' or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ def _PrepareBucket(benchmark_spec):
storage_service = benchmark_spec.storage_service
storage_service.PrepareService(util.GetRegionFromZone(location))
storage_service.MakeBucket(bucket, raise_on_failure=False)
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
bucket)


def _ClearTmpDirectory(benchmark_spec, vm):
Expand Down
3 changes: 2 additions & 1 deletion perfkitbenchmarker/linux_benchmarks/mnist_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def Prepare(benchmark_spec):
location = benchmark_spec.tpu_groups['train'].GetZone()
storage_service.PrepareService(util.GetRegionFromZone(location))
storage_service.MakeBucket(bucket)
storage_service.ChmodBucket(benchmark_spec.gcp_service_account, 'W', bucket)
storage_service.AclBucket(benchmark_spec.gcp_service_account, gcs.WRITER,
bucket)
else:
benchmark_spec.model_dir = '/tmp'

Expand Down
276 changes: 276 additions & 0 deletions perfkitbenchmarker/linux_benchmarks/object_storage_curl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
# Copyright 2021 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Use cURL to upload and download data to object storage in parallel.

Consistent with object_storage_service multistream scenario.

Due to the difficulty of signing to requests to S3 by hand
(https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html).
The benchmark uses insecure short lived buckets and should be used with caution.

TODO(pclay): Consider signing requests and not using public buckets.
"""

import logging
from typing import List, Tuple

from absl import flags
import numpy as np
from perfkitbenchmarker import configs
from perfkitbenchmarker import errors
from perfkitbenchmarker import flag_util
from perfkitbenchmarker import object_storage_service
from perfkitbenchmarker import providers
from perfkitbenchmarker import sample
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_benchmarks import object_storage_service_benchmark

BENCHMARK_NAME = 'object_storage_curl'

BENCHMARK_CONFIG = """
object_storage_curl:
description: Use cURL to upload and download data to object storage in parallel.
vm_groups:
default:
vm_spec: *default_single_core
flags:
# Required
object_storage_multistream_objects_per_stream: 1
object_storage_streams_per_vm: 10
"""

# Blocksize for dd to pipe data into uploads.
DD_BLOCKSIZE = 4000

# Magic strings
_UPLOAD = 'upload'
_DOWNLOAD = 'download'
_START_TIME = 'START_TIME'
_CURL_RESULTS = 'CURL_RESULTS'

flags.DEFINE_string('object_storage_curl_object_size', '1MB',
'Size of objects to upload / download. Similar to '
'--object_storage_object_sizes, but only takes a single '
'size.')
flags.DEFINE_bool('object_storage_curl_i_am_ok_with_public_read_write_buckets',
False, 'Acknowledge that this bucket will create buckets '
'which are publicly readable and writable. Required to run '
'this benchmark.')

FLAGS = flags.FLAGS


def GetConfig(user_config):
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)


def CheckPrerequisites(_):
"""Validate some unsupported flags."""
if (flag_util.StringToBytes(FLAGS.object_storage_curl_object_size) <
DD_BLOCKSIZE):
raise errors.Config.InvalidValue(
'--object_storage_curl_object_size must be larger than 4KB')
# TODO(pclay): Consider supporting multiple objects per stream.
if FLAGS.object_storage_multistream_objects_per_stream != 1:
raise errors.Config.InvalidValue(
'object_storage_curl only supports 1 object per stream')
if FLAGS.object_storage_object_naming_scheme != 'sequential_by_stream':
raise errors.Config.InvalidValue(
'object_storage_curl only supports sequential_by_stream naming.')
if not FLAGS.object_storage_curl_i_am_ok_with_public_read_write_buckets:
raise errors.Config.InvalidValue(
'This benchmark uses public read/write object storage bucket.\n'
'You must explicitly pass '
'--object_storage_curl_i_am_ok_with_public_read_write_buckets to '
'acknowledge that it will be created.\n'
'If PKB is interrupted, you should ensure it is cleaned up.')


# PyType does not currently support returning Abstract classes
# TODO(user): stop suppressing
# pytype: disable=not-instantiable
def _GetService() -> object_storage_service.ObjectStorageService:
"""Get a ready to use instance of ObjectStorageService."""
# TODO(pclay): consider using FLAGS.storage to allow cross cloud testing?
cloud = FLAGS.cloud
providers.LoadProvider(cloud)
service = object_storage_service.GetObjectStorageClass(cloud)()
# This method is idempotent with default args and safe to call in each phase.
service.PrepareService(FLAGS.object_storage_region)
return service


def _GetBucketName() -> str:
return FLAGS.object_storage_bucket_name or 'pkb%s' % FLAGS.run_uri


def Prepare(benchmark_spec):
"""Create and ACL bucket and install curl."""
# We would like to always cleanup server side states when exception happens.
benchmark_spec.always_call_cleanup = True

service = _GetService()
bucket_name = _GetBucketName()

service.MakeBucket(bucket_name)
service.MakeBucketPubliclyReadable(bucket_name, also_make_writable=True)

vms = benchmark_spec.vms
vm_util.RunThreaded(lambda vm: vm.InstallPackages('curl'), vms)


def Run(benchmark_spec) -> List[sample.Sample]:
"""Run storage benchmark and publish results.

Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.

Returns:
The same samples as object_storage_multistream.
"""
service = _GetService()
bucket = _GetBucketName()

object_bytes = flag_util.StringToBytes(FLAGS.object_storage_curl_object_size)
blocks = object_bytes // DD_BLOCKSIZE
streams_per_vm = FLAGS.object_storage_streams_per_vm

generate_data_cmd = (
'openssl aes-256-ctr -iter 1 -pass file:/dev/urandom -in /dev/zero'
f' | dd bs={DD_BLOCKSIZE} count={blocks} iflag=fullblock')
def StartTimeCmd(index):
return f"date '+{_START_TIME} {index} %s.%N'"

def CurlCmd(operation, index):
return (
f"curl -fsw '{_CURL_RESULTS} {index} %{{time_total}} "
# Pad size with zero to force curl to populate it.
f"%{{response_code}} 0%{{size_{operation}}}\n' -o /dev/null")

def Upload(vm):
commands = []
for object_index in range(streams_per_vm):
object_name = f'{vm.name}_{object_index}'
url = service.GetUploadUrl(bucket=bucket, object_name=object_name)
commands.append(
f'{StartTimeCmd(object_index)}; {generate_data_cmd} | '
f'{CurlCmd(_UPLOAD, object_index)} -X {service.UPLOAD_HTTP_METHOD} '
f"--data-binary @- '{url}'")
stdout, _ = vm.RemoteCommandsInParallel(commands)
return stdout

def Download(vm):
commands = []
for object_index in range(streams_per_vm):
object_name = f'{vm.name}_{object_index}'
url = service.GetDownloadUrl(bucket=bucket, object_name=object_name)
commands.append(f'{StartTimeCmd(object_index)}; '
f"{CurlCmd(_DOWNLOAD, object_index)} '{url}'")
stdout, _ = vm.RemoteCommandsInParallel(commands)
return stdout

vms = benchmark_spec.vms
samples = []
for operation, func in [(_UPLOAD, Upload), (_DOWNLOAD, Download)]:
output = vm_util.RunThreaded(func, vms)
start_times, latencies = _LoadWorkerOutput(output)
object_storage_service_benchmark.ProcessMultiStreamResults(
start_times,
latencies,
all_sizes=[object_bytes],
sizes=[np.array([object_bytes])] * streams_per_vm * len(vms),
operation=operation,
results=samples,
# We do not retry curl. We simply do not report failing latencies.
# This under-reports both latency and throughput. Since this benchmark
# is intended to measure throughput this is reasonable.
allow_failing_streams=True)
return samples


def Cleanup(_):
service = _GetService()
bucket_name = _GetBucketName()
if not FLAGS.object_storage_dont_delete_bucket:
service.DeleteBucket(bucket_name)
service.CleanupService()


def _LoadWorkerOutput(
output: List[str]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""Parse the output of Upload and Download functions.

The output of Upload and Download is
# START_TIME <index> <Unix start time in seconds>
START_TIME 0 12345.6789
# CURL_RESULTS <index> <latency in s> <HTTP code> <bytes transmitted>
CURL_RESULTS 0 1.2345 200 01000

Lines of output are not ordered and may be interleaved.

Args:
output: the output of each upload or download command

Returns:
the start times and latencies of the curl commands

Raises:
ValueError if output is unexpected.
Exception if the curl request failed with a 4XX code.
"""
start_times = []
latencies = []

for worker_out in output:
worker_start_times = [None] * FLAGS.object_storage_streams_per_vm
worker_latencies = [None] * FLAGS.object_storage_streams_per_vm
for line in worker_out.strip().split('\n'):
try:
line_type, index, value, *curl_data = line.split()
if line_type == _START_TIME:
assert not curl_data
worker_start_times[int(index)] = float(value)
elif line_type == _CURL_RESULTS:
response_code, bytes_transmitted = curl_data
bytes_transmitted = int(bytes_transmitted)
if response_code.startswith('4'):
raise Exception(
f'cURL command failed with HTTP Code {response_code}')
elif response_code == '200':
bytes_expected = flag_util.StringToBytes(
FLAGS.object_storage_curl_object_size)
if bytes_transmitted != bytes_expected:
raise ValueError(
f'cURL transmitted {bytes_transmitted}'
f' instead of {bytes_expected}.')
# curl 7.74 used μs instead of seconds. Not used in major OS types.
# https://github.com/curl/curl/issues/6321
assert '.' in value, 'Invalid curl output.'
worker_latencies[int(index)] = float(value)
else:
logging.warning('cURL command failed with HTTP code %s. '
'Not reporting latency.', response_code)
else:
raise ValueError(f'Unexpected line start: {line_type}.')
# Always show raw line when there is a parsing error.
except (ValueError, AssertionError) as e:
raise ValueError(f'Unexpected output:\n{line}') from e

for start_time, latency in zip(worker_start_times, worker_latencies):
if latency:
start_times.append(np.array([start_time], dtype=np.float64))
latencies.append(np.array([latency], dtype=np.float64))

return start_times, latencies
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,14 @@ def MultiThreadDeleteDelay(num_vms, threads_per_vm):
return (num_vms * threads_per_vm) / (MULTISTREAM_DELETE_OPS_PER_SEC)


def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
all_sizes, results, metadata=None):
def ProcessMultiStreamResults(start_times,
latencies,
sizes,
operation,
all_sizes,
results,
metadata=None,
allow_failing_streams=False):
"""Read and process results from the api_multistream worker process.

Results will be reported per-object size and combined for all
Expand All @@ -421,17 +427,24 @@ def _ProcessMultiStreamResults(start_times, latencies, sizes, operation,
distribution used, in bytes.
results: a list to append Sample objects to.
metadata: dict. Base sample metadata
allow_failing_streams: Whether to expect a result for all streams.
"""

num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms

assert len(start_times) == num_streams
assert len(latencies) == num_streams
assert len(sizes) == num_streams
total_num_streams = FLAGS.object_storage_streams_per_vm * FLAGS.num_vms
if allow_failing_streams:
num_streams = len(start_times)
assert len(latencies) == num_streams
assert len(sizes) == num_streams
else:
assert len(start_times) == total_num_streams
assert len(latencies) == total_num_streams
assert len(sizes) == total_num_streams
num_streams = total_num_streams

if metadata is None:
metadata = {}
metadata['num_streams'] = num_streams
metadata['num_streams'] = total_num_streams
metadata['num_failing_streams'] = total_num_streams - num_streams
metadata['objects_per_stream'] = (
FLAGS.object_storage_multistream_objects_per_stream)
metadata['object_naming'] = FLAGS.object_storage_object_naming_scheme
Expand Down Expand Up @@ -1053,7 +1066,7 @@ def _MultiStreamOneWay(results, metadata, vms, command_builder,
if FLAGS.object_storage_worker_output:
with open(FLAGS.object_storage_worker_output, 'w') as out_file:
out_file.write(json.dumps(output))
_ProcessMultiStreamResults(
ProcessMultiStreamResults(
start_times,
latencies,
sizes,
Expand Down
Loading