diff --git a/perfkitbenchmarker/linux_benchmarks/memcached_ycsb_benchmark.py b/perfkitbenchmarker/linux_benchmarks/memcached_ycsb_benchmark.py index f504839299..51bd29760c 100644 --- a/perfkitbenchmarker/linux_benchmarks/memcached_ycsb_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/memcached_ycsb_benchmark.py @@ -1,4 +1,4 @@ -# Copyright 2016 PerfKitBenchmarker Authors. All rights reserved. +# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Runs YCSB against memcached. +"""Runs YCSB against different memcached-like offerings. This benchmark runs two workloads against memcached using YCSB (the Yahoo! Cloud Serving Benchmark). @@ -21,15 +21,45 @@ """ import functools +import logging from perfkitbenchmarker import configs from perfkitbenchmarker import flags +from perfkitbenchmarker import providers from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import memcached_server from perfkitbenchmarker.linux_packages import ycsb +from perfkitbenchmarker.providers.aws import aws_network FLAGS = flags.FLAGS +flags.DEFINE_enum('memcached_managed', providers.GCP, + [providers.GCP, providers.AWS], + 'Managed memcached provider (GCP/AWS) to use.') + +flags.DEFINE_enum('memcached_scenario', 'custom', + ['custom', 'managed'], + 'select one scenario to run: \n' + 'custom: Provision VMs and install memcached ourselves. \n' + 'managed: Use the specified provider\'s managed memcache.') + +flags.DEFINE_enum('memcached_elasticache_region', 'us-west-1', + ['ap-northeast-1', 'ap-northeast-2', 'ap-southeast-1', + 'ap-southeast-2', 'ap-south-1', 'cn-north-1', 'eu-central-1', + 'eu-west-1', 'us-gov-west-1', 'sa-east-1', 'us-east-1', + 'us-east-2', 'us-west-1', 'us-west-2'], + 'The region to use for AWS ElastiCache memcached servers.') + +flags.DEFINE_enum('memcached_elasticache_node_type', 'cache.m3.medium', + ['cache.t2.micro', 'cache.t2.small', 'cache.t2.medium', + 'cache.m3.medium', 'cache.m3.large', 'cache.m3.xlarge', + 'cache.m3.2xlarge', 'cache.m4.large', 'cache.m4.xlarge', + 'cache.m4.2xlarge', 'cache.m4.4xlarge', 'cache.m4.10xlarge'], + 'The node type to use for AWS ElastiCache memcached servers.') + +flags.DEFINE_integer('memcached_elasticache_num_servers', 1, + 'The number of memcached instances for AWS ElastiCache.') + BENCHMARK_NAME = 'memcached_ycsb' BENCHMARK_CONFIG = """ @@ -74,23 +104,53 @@ def Prepare(benchmark_spec): benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ - loaders = benchmark_spec.vm_groups['clients'] - assert loaders, benchmark_spec.vm_groups - - # Memcached cluster - memcached_vms = benchmark_spec.vm_groups['servers'] - assert memcached_vms, 'No memcached VMs: {0}'.format(benchmark_spec.vm_groups) + clients = benchmark_spec.vm_groups['clients'] + assert clients, benchmark_spec.vm_groups + + hosts = [] + + if FLAGS.memcached_scenario == 'managed': + # We need to delete the managed memcached backend when we're done + benchmark_spec.always_call_cleanup = True + + if FLAGS.memcached_managed == providers.GCP: + raise NotImplementedError("GCP managed memcached backend not implemented " + "yet") + elif FLAGS.memcached_managed == providers.AWS: + cluster_id = 'pkb%s' % FLAGS.run_uri + service = providers.aws.elasticache.ElastiCacheMemcacheService( + aws_network.AwsNetwork.GetNetwork(clients[0]), + cluster_id, FLAGS.memcached_elasticache_region, + FLAGS.memcached_elasticache_node_type, + FLAGS.memcached_elasticache_num_servers) + service.Create() + hosts = service.GetHosts() + benchmark_spec.service = service + benchmark_spec.metadata = service.GetMetadata() + else: + # custom scenario + # Install memcached on all the servers + servers = benchmark_spec.vm_groups['servers'] + assert servers, 'No memcached servers: {0}'.format(benchmark_spec.vm_groups) + memcached_install_fns = \ + [functools.partial(memcached_server.ConfigureAndStart, vm) + for vm in servers] + vm_util.RunThreaded(lambda f: f(), memcached_install_fns) + hosts = ['%s:%s' % (vm.internal_ip, memcached_server.MEMCACHED_PORT) + for vm in servers] + benchmark_spec.metadata = {'ycsb_client_vms': FLAGS.ycsb_client_vms, + 'ycsb_server_vms': FLAGS.ycsb_server_vms, + 'num_vms': len(servers), + 'cache_size': FLAGS.memcached_size_mb} + + assert len(hosts) > 0 - memcached_install_fns = [functools.partial(memcached_server.ConfigureAndStart, - vm) - for vm in memcached_vms] ycsb_install_fns = [functools.partial(vm.Install, 'ycsb') - for vm in loaders] - - vm_util.RunThreaded(lambda f: f(), memcached_install_fns + ycsb_install_fns) + for vm in clients] + vm_util.RunThreaded(lambda f: f(), ycsb_install_fns) benchmark_spec.executor = ycsb.YCSBExecutor( 'memcached', - **{'memcached.hosts': ','.join([vm.internal_ip for vm in memcached_vms])}) + **{'memcached.hosts': ','.join(hosts)}) def Run(benchmark_spec): @@ -103,17 +163,16 @@ def Run(benchmark_spec): Returns: A list of sample.Sample instances. """ - loaders = benchmark_spec.vm_groups['clients'] - memcached_vms = benchmark_spec.vm_groups['servers'] - metadata = {'ycsb_client_vms': FLAGS.ycsb_client_vms, - 'num_vms': len(memcached_vms), - 'cache_size': FLAGS.memcached_size_mb} + logging.info('Start benchmarking memcached service, scenario is %s.', + FLAGS.memcached_scenario) + + clients = benchmark_spec.vm_groups['clients'] - samples = list(benchmark_spec.executor.LoadAndRun(loaders)) + samples = list(benchmark_spec.executor.LoadAndRun(clients)) for sample in samples: - sample.metadata.update(metadata) + sample.metadata.update(benchmark_spec.metadata) return samples @@ -125,5 +184,10 @@ def Cleanup(benchmark_spec): benchmark_spec: The benchmark specification. Contains all data that is required to run the benchmark. """ - memcached_vms = benchmark_spec.vm_groups['servers'] - vm_util.RunThreaded(memcached_server.StopMemcached, memcached_vms) + if FLAGS.memcached_scenario == 'managed': + service = benchmark_spec.service + service.Destroy() + else: + # Custom scenario + servers = benchmark_spec.vm_groups['servers'] + vm_util.RunThreaded(memcached_server.StopMemcached, servers) diff --git a/perfkitbenchmarker/linux_packages/memcached_server.py b/perfkitbenchmarker/linux_packages/memcached_server.py index f8901b9d89..6101e8e012 100644 --- a/perfkitbenchmarker/linux_packages/memcached_server.py +++ b/perfkitbenchmarker/linux_packages/memcached_server.py @@ -122,6 +122,11 @@ def StopMemcached(server): (server.internal_ip, MEMCACHED_PORT)) +def FlushMemcachedServer(ip, port): + vm_util.IssueCommand( + '(echo -e "flush_all\n" ; sleep 1)| netcat %s %s' % (ip, port)) + + def Uninstall(vm): vm.RemoteCommand('pkill memcached') vm.RemoteCommand('rm -rf %s' % MEMCACHED_DIR) diff --git a/perfkitbenchmarker/linux_packages/ycsb.py b/perfkitbenchmarker/linux_packages/ycsb.py index 6d251aaf99..ced0940dfa 100644 --- a/perfkitbenchmarker/linux_packages/ycsb.py +++ b/perfkitbenchmarker/linux_packages/ycsb.py @@ -265,7 +265,7 @@ def LineFilter(line): # Drop ">" from ">1000" if name.startswith('>'): name = name[1:] - val = float(val) if '.' in val else int(val) + val = float(val) if '.' in val or 'nan' in val.lower() else int(val) if name.isdigit(): if val: op_result[data_type].append((int(name), val)) diff --git a/perfkitbenchmarker/memcache_service.py b/perfkitbenchmarker/memcache_service.py new file mode 100644 index 0000000000..a61d786ad4 --- /dev/null +++ b/perfkitbenchmarker/memcache_service.py @@ -0,0 +1,35 @@ +# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class MemcacheService(object): + CLOUD = None + + def __init__(self): + pass + + def Create(self): + raise NotImplementedError + + def Destroy(self): + raise NotImplementedError + + def Flush(self): + raise NotImplementedError + + def GetHosts(self): + raise NotImplementedError + + def GetMetadata(self): + raise NotImplementedError diff --git a/perfkitbenchmarker/providers/aws/aws_network.py b/perfkitbenchmarker/providers/aws/aws_network.py index c1879914b7..7adec429ab 100644 --- a/perfkitbenchmarker/providers/aws/aws_network.py +++ b/perfkitbenchmarker/providers/aws/aws_network.py @@ -62,9 +62,22 @@ def AllowPort(self, vm, start_port, end_port=None): """ if vm.is_static: return + self.AllowPortInSecurityGroup(vm.region, vm.group_id, start_port, end_port) + + def AllowPortInSecurityGroup(self, region, security_group, + start_port, end_port=None): + """Opens a port on the firewall for a security group. + + Args: + region: The region of the security group + security_group: The security group in which to open the ports + start_port: The first local port to open in a range. + end_port: The last local port to open in a range. If None, only start_port + will be opened. + """ if end_port is None: end_port = start_port - entry = (start_port, end_port, vm.group_id) + entry = (start_port, end_port, region, security_group) if entry in self.firewall_set: return with self._lock: @@ -73,8 +86,8 @@ def AllowPort(self, vm, start_port, end_port=None): authorize_cmd = util.AWS_PREFIX + [ 'ec2', 'authorize-security-group-ingress', - '--region=%s' % vm.region, - '--group-id=%s' % vm.group_id, + '--region=%s' % region, + '--group-id=%s' % security_group, '--port=%s-%s' % (start_port, end_port), '--cidr=0.0.0.0/0'] util.IssueRetryableCommand( diff --git a/perfkitbenchmarker/providers/aws/elasticache.py b/perfkitbenchmarker/providers/aws/elasticache.py new file mode 100644 index 0000000000..8c48eb3b32 --- /dev/null +++ b/perfkitbenchmarker/providers/aws/elasticache.py @@ -0,0 +1,139 @@ +# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging + +from perfkitbenchmarker import errors +from perfkitbenchmarker import providers +from perfkitbenchmarker import vm_util +from perfkitbenchmarker.linux_packages import memcached_server +from perfkitbenchmarker.memcache_service import MemcacheService +from perfkitbenchmarker.providers.aws import aws_network + + +ELASTICACHE_PORT = 11211 + + +class ElastiCacheMemcacheService(MemcacheService): + + CLOUD = providers.AWS + + def __init__(self, network, cluster_id, region, node_type, num_servers=1): + self.cluster_id = cluster_id + self.region = region + self.node_type = node_type + self.num_servers = num_servers + self.hosts = [] # [(ip, port)] + + self.vpc_id = network.subnet.vpc_id + self.security_group_id = \ + network.regional_network.vpc.default_security_group_id + self.subnet_id = network.subnet.id + self.subnet_group_name = '%ssubnet' % cluster_id + + def Create(self): + # Open the port memcached needs + aws_network.AwsFirewall.GetFirewall() \ + .AllowPortInSecurityGroup(self.region, self.security_group_id, + ELASTICACHE_PORT) + + # Create a cache subnet group + cmd = ['aws', 'elasticache', 'create-cache-subnet-group', + '--region=%s' % self.region, + '--cache-subnet-group-name=%s' % self.subnet_group_name, + '--cache-subnet-group-description="PKB memcached_ycsb benchmark"', + '--subnet-ids=%s' % self.subnet_id] + vm_util.IssueCommand(cmd) + + # Create the cluster + cmd = ['aws', 'elasticache', 'create-cache-cluster', + '--engine=memcached', + '--cache-subnet-group-name=%s' % self.subnet_group_name, + '--cache-cluster-id=%s' % self.cluster_id, + '--num-cache-nodes=%s' % self.num_servers, + '--region=%s' % self.region, + '--cache-node-type=%s' % self.node_type] + vm_util.IssueCommand(cmd) + + # Wait for the cluster to come up + cluster_info = self._WaitForClusterUp() + + # Parse out the hosts + self.hosts = \ + [(node['Endpoint']['Address'], node['Endpoint']['Port']) + for node in cluster_info['CacheNodes']] + assert len(self.hosts) == self.num_servers + + def Destroy(self): + # Delete the ElastiCache cluster + cmd = ['aws', 'elasticache', 'delete-cache-cluster', + '--cache-cluster-id=%s' % self.cluster_id, + '--region=%s' % self.region] + vm_util.IssueCommand(cmd) + # Don't have to delete the subnet group. It will be deleted with the subnet. + + def Flush(self): + vm_util.RunThreaded(memcached_server.FlushMemcachedServer, self.hosts) + + def GetHosts(self): + return ["%s:%s" % (ip, port) for ip, port in self.hosts] + + def GetMetadata(self): + return {'num_servers': self.num_servers, + 'elasticache_region': self.region, + 'elasticache_node_type': self.node_type} + + def _GetClusterInfo(self): + cmd = ['aws', 'elasticache', 'describe-cache-clusters'] + cmd += ['--cache-cluster-id=%s' % self.cluster_id] + cmd += ['--region=%s' % self.region] + cmd += ['--show-cache-node-info'] + out, _, _ = vm_util.IssueCommand(cmd) + return json.loads(out)["CacheClusters"][0] + + @vm_util.Retry(poll_interval=15, timeout=300, + retryable_exceptions=(errors.Resource.RetryableCreationError)) + def _WaitForClusterUp(self): + """Block until the ElastiCache memcached cluster is up. + + Will timeout after 5 minutes, and raise an exception. Before the timeout + expires any exceptions are caught and the status check is retried. + + We check the status of the cluster using the AWS CLI. + + Returns: + The cluster info json as a dict + + Raises: + errors.Resource.RetryableCreationError when response is not as expected or + if there is an error connecting to the port or otherwise running the + remote check command. + """ + logging.info("Trying to get ElastiCache cluster info for %s", + self.cluster_id) + cluster_status = None + try: + cluster_info = self._GetClusterInfo() + cluster_status = cluster_info['CacheClusterStatus'] + if cluster_status == 'available': + logging.info("ElastiCache memcached cluster is up and running.") + return cluster_info + except errors.VirtualMachine.RemoteCommandError as e: + raise errors.Resource.RetryableCreationError( + "ElastiCache memcached cluster not up yet: %s." % str(e)) + else: + raise errors.Resource.RetryableCreationError( + "ElastiCache memcached cluster not up yet. Status: %s" % + cluster_status) diff --git a/perfkitbenchmarker/providers/gcp/memcache.py b/perfkitbenchmarker/providers/gcp/memcache.py new file mode 100644 index 0000000000..171e3b507f --- /dev/null +++ b/perfkitbenchmarker/providers/gcp/memcache.py @@ -0,0 +1,39 @@ +# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from perfkitbenchmarker import providers +from perfkitbenchmarker.memcache_service import MemcacheService + + +class MemcacheService(MemcacheService): + + CLOUD = providers.GCP + + def __init__(self): + pass + + def Create(self): + raise NotImplementedError + + def Destroy(self): + raise NotImplementedError + + def Flush(self): + raise NotImplementedError + + def GetHosts(self): + raise NotImplementedError + + def GetMetadata(self): + raise NotImplementedError