From 04a0bcd3e5748098c110d97d31cc9af1b79474fd Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 21 Jan 2025 13:45:15 -0500 Subject: [PATCH 1/9] Implement benchmark harness with stubbed HTTP client. --- scripts/performance/run-benchmarks | 363 +++++++++++++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 scripts/performance/run-benchmarks diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks new file mode 100644 index 000000000000..3d645af6f73f --- /dev/null +++ b/scripts/performance/run-benchmarks @@ -0,0 +1,363 @@ +#!/usr/bin/env python +import argparse +import json +import os +import psutil +import shutil +import subprocess +import time +from unittest import mock + + +import awscli.botocore.awsrequest +from awscli.clidriver import AWSCLIEntryPoint +from awscli.compat import BytesIO + +_BENCHMARK_DEFINITIONS = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'assets', + 'benchmarks.json' + ) + + +class RawResponse(BytesIO): + """ + A bytes-like streamable HTTP response representation. + """ + def stream(self, **kwargs): + contents = self.read() + while contents: + yield contents + contents = self.read() + + +class StubbedHTTPClient(object): + """ + A generic stubbed HTTP client. + """ + def setup(self): + urllib3_session_send = 'botocore.httpsession.URLLib3Session.send' + self._urllib3_patch = mock.patch(urllib3_session_send) + self._send = self._urllib3_patch.start() + self._send.side_effect = self.get_response + self._responses = [] + + def tearDown(self): + self._urllib3_patch.stop() + + def get_response(self, request): + response = self._responses.pop(0) + if isinstance(response, Exception): + raise response + return response + + def add_response(self, body, headers, status_code): + response = awscli.botocore.awsrequest.AWSResponse( + url='http://169.254.169.254/', + status_code=status_code, + headers=headers, + raw=RawResponse(body.encode()) + ) + self._responses.append(response) + + +class ProcessBenchmarker(object): + """ + Periodically samples CPU and memory usage of a process given its pid. Writes + all collected samples to a CSV file. + """ + def benchmark_process(self, pid, output_file, data_interval): + parent_pid = os.getpid() + try: + # Benchmark the process where the script is being run. + self._run_benchmark(pid, output_file, data_interval) + except KeyboardInterrupt: + # If there is an interrupt, then try to clean everything up. + proc = psutil.Process(parent_pid) + procs = proc.children(recursive=True) + + for child in procs: + child.terminate() + + gone, alive = psutil.wait_procs(procs, timeout=1) + for child in alive: + child.kill() + raise + + + def _run_benchmark(self, pid, output_file, data_interval): + process_to_measure = psutil.Process(pid) + + with open(output_file, 'w') as f: + while process_to_measure.is_running(): + if process_to_measure.status() == psutil.STATUS_ZOMBIE: + process_to_measure.kill() + break + time.sleep(data_interval) + try: + # Collect the memory and cpu usage. + memory_used = process_to_measure.memory_info().rss + cpu_percent = process_to_measure.cpu_percent() + except (psutil.AccessDenied, psutil.ZombieProcess): + # Trying to get process information from a closed or + # zombie process will result in corresponding exceptions. + break + + # Determine the lapsed time for bookkeeping + current_time = time.time() + + # Save all the data into a CSV file. + f.write( + f"{current_time},{memory_used},{cpu_percent}\n" + ) + f.flush() + + +def _create_file_with_size(path, size): + """ + Creates a full-access file in the given directory with the + specified name and size. + """ + f = open(path, 'wb') + os.chmod(path, 0o777) + size = int(size) + f.truncate(size) + f.close() + + +def _create_file_dir(dir_path, file_count, size): + """ + Creates a directory with the specified name. Also creates identical files + with the given size in the created directory. The number of identical files + to be created is specified by file_count. + """ + os.mkdir(dir_path, 0o777) + for i in range(int(file_count)): + file_path = os.path.join(dir_path, f'{i}') + _create_file_with_size(file_path, size) + + +def _overwrite_dir_full_access(directory): + if os.path.exists(directory): + shutil.rmtree(directory) + os.makedirs(directory, 0o777) + + +def _get_default_env(config_file): + return { + 'AWS_CONFIG_FILE': config_file, + 'AWS_DEFAULT_REGION': 'us-west-2', + 'AWS_ACCESS_KEY_ID': 'access_key', + 'AWS_SECRET_ACCESS_KEY': 'secret_key' + } + + +def _default_config_file_contents(): + return ( + '[default]' + ) + + +def _setup_environment(env, result_dir, config_file): + """ + Creates all files / directories defined in the env struct. + Also, writes a config file named 'config' to the result directory + with contents optionally specified by the env struct. + """ + if "files" in env: + for file_def in env['files']: + path = os.path.join(result_dir, file_def['name']) + _create_file_with_size(path, file_def['size']) + if "file_dirs" in env: + for file_dir_def in env['file_dirs']: + dir_path = os.path.join(result_dir, file_dir_def['name']) + _create_file_dir( + dir_path, + file_dir_def['file_count'], + file_dir_def['file_size'] + ) + with open(config_file, 'w') as f: + f.write(env.get('config', _default_config_file_contents())) + f.flush() + + +def _setup_iteration( + benchmark, + client, + result_dir, + performance_dir, + config_file +): + """ + Performs the setup for a single iteration of a benchmark. This + includes creating the files used by a command and stubbing + the HTTP client to use during execution. + """ + env = benchmark.get('environment', {}) + _setup_environment(env, result_dir, config_file) + _overwrite_dir_full_access(performance_dir) + client.setup() + _stub_responses( + benchmark.get('responses', [{"headers": {}, "body": ""}]), + client + ) + + +def _reset_dir(dir): + """ + Deletes everything in the given folder, and recreates it anew. + """ + shutil.rmtree(dir, ignore_errors=True) + _overwrite_dir_full_access(dir) + + +def _stub_responses(responses, client): + """ + Stubs the supplied HTTP client using the response instructions in the supplied + responses struct. Each instruction will generate one or more stubbed responses. + """ + for response in responses: + body = response.get("body", "") + headers = response.get("headers", {}) + status_code = response.get("status_code", 200) + # use the instances key to support duplicating responses a configured number of times + if "instances" in response: + for _ in range(int(response['instances'])): + client.add_response(body, headers, status_code) + else: + client.add_response(body, headers, status_code) + + +def _process_measurements(benchmark_result): + """ + Perform post-processing of the output of S3Transfer's summarize script, + such as removing unneeded entries. + """ + del benchmark_result['std_dev_total_time'] + del benchmark_result['std_dev_max_memory'] + del benchmark_result['std_dev_average_memory'] + del benchmark_result['std_dev_average_cpu'] + + +def _run_isolated_benchmark( + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args +): + """ + Runs a single iteration of one benchmark execution. Includes setting up + the environment, running the benchmarked execution, formatting + the results, and cleaning up the environment. + """ + out_file = os.path.join(performance_dir, 'performance.csv') + assets_dir = os.path.join(result_dir, 'assets') + config_file = os.path.join(assets_dir, 'config') + # setup for iteration of benchmark + _setup_iteration(benchmark, client, result_dir, performance_dir, config_file) + os.chdir(result_dir) + # patch the OS environment with our supplied defaults + env_patch = mock.patch.dict('os.environ', _get_default_env(config_file)) + env_patch.start() + # fork a child process to run the command on. + # the parent process benchmarks the child process until the child terminates. + pid = os.fork() + + try: + # execute command on child process + if pid == 0: + AWSCLIEntryPoint().main(benchmark['command']) + os._exit(0) + # benchmark child process from parent process until child terminates + process_benchmarker.benchmark_process( + pid, + out_file, + args.data_interval + ) + # summarize benchmark results & process summary + summary = json.loads(subprocess.check_output( + [args.summarize_script, out_file, '--output-format', 'json'] + )) + finally: + # cleanup iteration of benchmark + client.tearDown() + _reset_dir(result_dir) + _reset_dir(assets_dir) + env_patch.stop() + return summary + + +def run_benchmarks(args): + """ + Orchestrates benchmarking via the benchmark definitions in + the arguments. + """ + summaries = {'results': []} + result_dir = args.result_dir + assets_dir = os.path.join(result_dir, 'assets') + performance_dir = os.path.join(result_dir, 'performance') + client = StubbedHTTPClient() + process_benchmarker = ProcessBenchmarker() + _overwrite_dir_full_access(result_dir) + _overwrite_dir_full_access(assets_dir) + + with open(args.benchmark_definitions, 'r') as file: + definitions = json.load(file) + + try: + for benchmark in definitions: + benchmark_result = { + 'name': benchmark['name'], + 'dimensions': benchmark['dimensions'], + 'measurements': [] + } + for _ in range(args.num_iterations): + measurements = _run_isolated_benchmark( + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args + ) + _process_measurements(measurements) + benchmark_result['measurements'].append(measurements) + summaries['results'].append(benchmark_result) + finally: + # final cleanup + shutil.rmtree(result_dir, ignore_errors=True) + print(summaries) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + '--benchmark-definitions', default=_BENCHMARK_DEFINITIONS, + help=('The JSON file defining the commands to benchmark.') + ) + parser.add_argument( + '--summarize-script', + required=True, + help=('The summarize script to run the commands with. This should be ' + 'from s3transfer.') + ) + parser.add_argument( + '-o', '--result-dir', default='results', + help='The directory to output performance results to. Existing ' + 'results will be deleted.' + ) + parser.add_argument( + '--data-interval', + default=0.01, + type=float, + help='The interval in seconds to poll for data points.', + ) + parser.add_argument( + '--num-iterations', + default=1, + type=int, + help='The number of iterations to repeat the benchmark for.', + ) + run_benchmarks(parser.parse_args()) \ No newline at end of file From fc46bcb268b70af756b7650bb7ea227b5344f47b Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 21 Jan 2025 13:46:39 -0500 Subject: [PATCH 2/9] Implement S3 benchmarks. --- scripts/performance/assets/benchmarks.json | 166 +++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 scripts/performance/assets/benchmarks.json diff --git a/scripts/performance/assets/benchmarks.json b/scripts/performance/assets/benchmarks.json new file mode 100644 index 000000000000..c6c0efee809a --- /dev/null +++ b/scripts/performance/assets/benchmarks.json @@ -0,0 +1,166 @@ +[ + { + "name": "s3.cp.upload", + "command": ["s3", "cp", "test_file", "s3://bucket/test_file", "--quiet"], + "dimensions": [ + {"FileSize": "32MB"}, + {"S3TransferClient": "Classic"} + ], + "environment": { + "files": [ + { + "name": "test_file", + "size": 3.2e7 + } + ], + "config": "[default]\ns3 =\n preferred_transfer_client = classic" + }, + "responses": [ + { + "body": "bucket key upload-id" + }, + { + "headers": {"ETag": "etag"}, + "instances": 4 + }, + { + "body": "bucket key etag-123" + } + ] + }, + { + "name": "s3.cp.upload", + "command": ["s3", "cp", "test_file", "s3://bucket/test_file", "--quiet"], + "dimensions": [ + {"FileSize": "32MB"}, + {"S3TransferClient": "CRT"} + ], + "environment": { + "files": [ + { + "name": "test_file", + "size": 3.2e7 + } + ], + "config": "[default]\ns3 =\n preferred_transfer_client = crt" + }, + "responses": [ + { + "body": "bucket key upload-id" + }, + { + "headers": {"ETag": "etag"}, + "instances": 4 + }, + { + "body": "bucket key etag-123" + } + ] + }, + { + "name": "s3.mv.upload", + "command": ["s3", "mv", "test_file", "s3://bucket/test_file", "--quiet"], + "dimensions": [ + {"FileSize": "32MB"} + ], + "environment": { + "files": [ + { + "name": "test_file", + "size": 3.2e7 + } + ] + }, + "responses": [ + { + "headers": {}, + "body": "bucket key upload-id" + }, + { + "headers": {"ETag": "etag"}, + "instances": 4 + }, + { + "headers": {}, + "body": "bucket key etag-123" + } + ] + }, + { + "name": "s3.mv.download", + "command": ["s3", "mv", "s3://bucket/test_file123", "./test_file123", "--quiet"], + "dimensions": [ + {"FileSize": "32MB"}, + {"S3TransferClient": "Classic"} + ], + "environment": { + "config": "[default]\ns3 =\n preferred_transfer_client = classic" + }, + "responses": [ + { + "headers": {"Content-Length": "0", "Last-Modified": "Thu, 18 Oct 2018 23:00:00 GMT", "ETag": "etag-1"} + }, + { + "headers": {"Content-Length": "7", "Last-Modified": "Thu, 18 Oct 2018 23:00:00 GMT", "ETag": "etag-1"}, + "body": "content" + }, + {} + ] + }, + { + "name": "s3.sync.upload", + "command": ["s3", "sync", "./test_dir", "s3://bucket/test_dir", "--quiet"], + "dimensions": [ + {"FileCount": "50,000"}, + {"FileSize": "4KB"}, + {"S3TransferClient": "Classic"} + ], + "environment": { + "file_dirs": [ + { + "name": "test_dir", + "file_count": 5e4, + "file_size": 4e3 + } + ], + "config": "[default]\ns3 =\n preferred_transfer_client = classic" + }, + "responses": [ + { + "body": "2015-12-08T18:26:43.000Z key 100 " + }, + { + "headers": {"ETag": "etag"}, + "instances": 5e4 + } + ] + }, + { + "name": "s3.sync.upload", + "command": ["s3", "sync", "./test_dir", "s3://bucket/test_dir", "--quiet"], + "dimensions": [ + {"FileCount": "50,000"}, + {"FileSize": "4KB"}, + {"S3TransferClient": "CRT"} + ], + "environment": { + "file_dirs": [ + { + "name": "test_dir", + "file_count": 5e4, + "file_size": 4e3 + } + ], + "config": "[default]\ns3 =\n preferred_transfer_client = crt" + }, + "responses": [ + { + "body": "2015-12-08T18:26:43.000Z key 100 " + }, + { + "headers": {"ETag": "etag"}, + "instances": 5e4 + } + ] + } +] From 89b7d004984d1708937f56c47a324258ad33bf0a Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 28 Jan 2025 15:38:27 -0500 Subject: [PATCH 3/9] Include README. Removed subsumed S3 benchmark scripts. --- scripts/performance/README.md | 288 ++++++++++++++++++ scripts/performance/benchmark-cp | 36 --- scripts/performance/benchmark-mv | 43 --- scripts/performance/benchmark-rm | 22 -- scripts/performance/benchmark_utils.py | 251 --------------- .../performance/{assets => }/benchmarks.json | 0 scripts/performance/perfcmp | 152 --------- scripts/performance/run-benchmarks | 1 - 8 files changed, 288 insertions(+), 505 deletions(-) create mode 100644 scripts/performance/README.md delete mode 100755 scripts/performance/benchmark-cp delete mode 100755 scripts/performance/benchmark-mv delete mode 100755 scripts/performance/benchmark-rm delete mode 100644 scripts/performance/benchmark_utils.py rename scripts/performance/{assets => }/benchmarks.json (100%) delete mode 100755 scripts/performance/perfcmp diff --git a/scripts/performance/README.md b/scripts/performance/README.md new file mode 100644 index 000000000000..d0740b379f1a --- /dev/null +++ b/scripts/performance/README.md @@ -0,0 +1,288 @@ +# AWS CLI Performance Benchmarks + +This document outlines details of the AWS CLI performance benchmarks, +including how to run benchmarks and how to add your own. + +## Running the Benchmarks + +Our benchmark executor works by running all benchmarks defined in +`benchmarks.json`. For each benchmark defined in this JSON file, it +runs the command for a configurable number of iterations (default: 1) +and benchmarks metrics such as memory usage, CPU utilization, and +timings. + +The benchmark executor also stubs an HTTP client with mock responses +defined in `benchmarks.json`. This ensures the timings produced in +the results reflect only the AWS CLI and **not** external factors +such as service latency or network throughput. + +### Example + +The following example command runs the benchmarks defined in `benchmarks.json`, +and executes each command 2 times. + +`./run-benchmark --result-dir ./results --num-iterations 2` + +An example output for this command is shown below. + +```json +{ + "results":[ + { + "name":"s3.cp.upload", + "dimensions":[ + { + "FileSize":"32MB" + }, + { + "S3TransferClient":"Classic" + } + ], + "measurements":[ + { + "total_time":0.2531106472015381, + "max_memory":76791808.0, + "max_cpu":5.0, + "p50_memory":51412992.0, + "p95_memory":75235328.0, + "p50_cpu":1.5, + "p95_cpu":2.4, + "first_client_invocation_time":0.24789667129516602 + }, + { + "total_time":0.17595314979553223, + "max_memory":76939264.0, + "max_cpu":6.2, + "p50_memory":52297728.0, + "p95_memory":75710464.0, + "p50_cpu":2.1, + "p95_cpu":2.5, + "first_client_invocation_time":0.17173004150390625 + } + ] + }, + { + "name":"s3.cp.upload", + "dimensions":[ + { + "FileSize":"32MB" + }, + { + "S3TransferClient":"CRT" + } + ], + "measurements":[ + { + "total_time":0.7724411487579346, + "max_memory":81002496.0, + "max_cpu":4.1, + "p50_memory":78479360.0, + "p95_memory":80822272.0, + "p50_cpu":0.0, + "p95_cpu":2.4, + "first_client_invocation_time":0.17360806465148926 + }, + { + "total_time":0.6735439300537109, + "max_memory":80658432.0, + "max_cpu":5.2, + "p50_memory":78495744.0, + "p95_memory":80412672.0, + "p50_cpu":0.0, + "p95_cpu":2.4, + "first_client_invocation_time":0.17362713813781738 + } + ] + }, + { + "name":"s3.mv.upload", + "dimensions":[ + { + "FileSize":"32MB" + } + ], + "measurements":[ + { + "total_time":0.17440271377563477, + "max_memory":76972032.0, + "max_cpu":4.6, + "p50_memory":52166656.0, + "p95_memory":75776000.0, + "p50_cpu":2.1, + "p95_cpu":2.5, + "first_client_invocation_time":0.16981887817382812 + }, + { + "total_time":0.17231082916259766, + "max_memory":75825152.0, + "max_cpu":6.1, + "p50_memory":52199424.0, + "p95_memory":74842112.0, + "p50_cpu":2.1, + "p95_cpu":2.5, + "first_client_invocation_time":0.16803598403930664 + } + ] + }, + { + "name":"s3.mv.download", + "dimensions":[ + { + "FileSize":"32MB" + }, + { + "S3TransferClient":"Classic" + } + ], + "measurements":[ + { + "total_time":0.17304229736328125, + "max_memory":76152832.0, + "max_cpu":4.0, + "p50_memory":52674560.0, + "p95_memory":74907648.0, + "p50_cpu":2.1, + "p95_cpu":2.4, + "first_client_invocation_time":0.16739511489868164 + }, + { + "total_time":0.16962409019470215, + "max_memory":76693504.0, + "max_cpu":4.9, + "p50_memory":52314112.0, + "p95_memory":75431936.0, + "p50_cpu":2.1, + "p95_cpu":2.6, + "first_client_invocation_time":0.16400408744812012 + } + ] + }, + { + "name":"s3.sync.upload", + "dimensions":[ + { + "FileCount":"5,000" + }, + { + "FileSize":"4KB" + }, + { + "S3TransferClient":"Classic" + } + ], + "measurements":[ + { + "total_time":11.370934963226318, + "max_memory":134578176.0, + "max_cpu":20.7, + "p50_memory":106397696.0, + "p95_memory":132235264.0, + "p50_cpu":2.4, + "p95_cpu":2.7, + "first_client_invocation_time":0.6362888813018799 + }, + { + "total_time":12.029011964797974, + "max_memory":134676480.0, + "max_cpu":18.6, + "p50_memory":105955328.0, + "p95_memory":131727360.0, + "p50_cpu":2.4, + "p95_cpu":2.7, + "first_client_invocation_time":0.6395571231842041 + } + ] + }, + { + "name":"s3.sync.upload", + "dimensions":[ + { + "FileCount":"5,000" + }, + { + "FileSize":"4KB" + }, + { + "S3TransferClient":"CRT" + } + ], + "measurements":[ + { + "total_time":90.28388690948486, + "max_memory":188809216.0, + "max_cpu":17.9, + "p50_memory":144375808.0, + "p95_memory":188792832.0, + "p50_cpu":0.0, + "p95_cpu":3.4, + "first_client_invocation_time":0.656865119934082 + }, + { + "total_time":84.99997591972351, + "max_memory":190808064.0, + "max_cpu":20.7, + "p50_memory":143917056.0, + "p95_memory":186728448.0, + "p50_cpu":0.0, + "p95_cpu":3.5, + "first_client_invocation_time":0.7549021244049072 + } + ] + } + ] +} +``` + +## Defining Your own Benchmarks for Local Performance Testing + +To create your own benchmark definitions, create a file on your machine containing +a JSON-formatted list of benchmark definitions. Each benchmark definition supports +the keys below. Each key is required unless specified otherwise. + +- `name` (string): The name of the benchmark. +- `command` (list): The AWS CLI command to benchmark, including arguments. + - Each element of the list is a string component of the command. + - Example value: `["s3", "cp", "test_file", "s3://bucket/test_file", "--quiet"]`. +- `dimensions` (list) **(optional)**: Used to specify additional dimensions for +interpreting this metric. + - Each element in the list is an object with a single key-value pair. +The key is the name of the dimension (e.g. `FileSize`), and the value +is the value of the dimension (e.g. `32MB`). +- `environment` (object) **(optional)**: Specifies settings for the environment to run +the command in. + - The environment object supports the following keys: + - `files` (list) **(optional)**: Specifies the files that must be +created before executing the benchmark. The files created will be filled with +null bytes to achieve the specified size. + - Each element is an object with the following keys: + - `name` (string): Name of the file to create + - `size` (int): The size of the file to create in bytes. + - `file_dirs` (list) **(optional)**: Specifies the directories that must +be created before executing the benchmark. The directories will be created +and filled with the specified number of files, each of which will be filled +with null bytes to achieve the specified file size. + - Each element is an object with the following keys: + - `name` (string): Name of the directory + - `file_count` (int): The number of files to create in the directory. + - `file_size` (int): The size of each file in the directory, in bytes. + - `config` (string) **(optional)**: The contents of the AWS config +file to use for the benchmark execution. + - Default: `"[default]"`. + - Example value: `"[default]\ns3 =\n preferred_transfer_client = crt"` +- `responses` (list) **(optional)**: A list of HTTP responses to stub from +the service for each request made during command execution. + - Default: `[{{"headers": {}, "body": ""}]` + - Each element of the list is an object with the following keys: + - `status_code` (int) **(optional)**: The status code of the response. + - Default: `200` + - `headers` (object) **(optional)**: Used to specify the HTTP headers of +the response. Each key-value pair corresponds to a single header name (key) +and its value. + - Default: `{}` + - `body` (string) **(optional)**: The raw HTTP response. + - Default: `""` + - `instances` (int) **(optional)**: The total number of times to stub +this response; this prevents the need to repeat the same response many times. + - Default: 1 + - This is useful for commands such as `aws s3 sync`, that may execute many + HTTP requests with similar responses. \ No newline at end of file diff --git a/scripts/performance/benchmark-cp b/scripts/performance/benchmark-cp deleted file mode 100755 index e63ae7cd8d56..000000000000 --- a/scripts/performance/benchmark-cp +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -from benchmark_utils import summarize, clean -from benchmark_utils import get_default_argparser, get_transfer_command -from benchmark_utils import create_random_subfolder, benchmark_command - - -def benchmark_cp(args): - destination = args.destination - if args.recursive: - destination = create_random_subfolder(destination) - command = 'cp %s %s' % (args.source, destination) - command = get_transfer_command(command, args.recursive, args.quiet) - - def cleanup(): - if not args.no_cleanup: - clean(destination, args.recursive) - - benchmark_command( - command, args.benchmark_script, args.summarize_script, - args.result_dir, args.num_iterations, args.dry_run, - cleanup=cleanup - ) - - -if __name__ == "__main__": - parser = get_default_argparser() - parser.add_argument( - '-s', '--source', required=True, - help='A local path or s3 path.' - ) - parser.add_argument( - '-d', '--destination', required=True, - help='A local path or s3 path. A directory will be created in this ' - 'location to copy to in the case of a recursive transfer.' - ) - benchmark_cp(parser.parse_args()) diff --git a/scripts/performance/benchmark-mv b/scripts/performance/benchmark-mv deleted file mode 100755 index b6e679425edd..000000000000 --- a/scripts/performance/benchmark-mv +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -from benchmark_utils import backup, copy, clean, get_default_argparser -from benchmark_utils import create_random_subfolder, benchmark_command -from benchmark_utils import get_transfer_command - - -def benchmark_mv(args): - destination = args.destination - if args.recursive: - destination = create_random_subfolder(destination) - command = 'mv %s %s' % (args.source, destination) - command = get_transfer_command(command, args.recursive, args.quiet) - backup_path = backup(args.source, args.recursive) - - def cleanup(): - if not args.no_cleanup: - clean(destination, args.recursive) - clean(backup_path, args.recursive) - - def upkeep(): - clean(args.source, args.recursive) - copy(backup_path, args.source, args.recursive) - - benchmark_command( - command, args.benchmark_script, args.summarize_script, - args.result_dir, args.num_iterations, args.dry_run, - upkeep=upkeep, - cleanup=cleanup - ) - - -if __name__ == "__main__": - parser = get_default_argparser() - parser.add_argument( - '-s', '--source', required=True, - help='A local path or s3 path.' - ) - parser.add_argument( - '-d', '--destination', required=True, - help='A local path or s3 path. A directory will be created in this ' - 'location to move to in the case of a recursive transfer.' - ) - benchmark_mv(parser.parse_args()) diff --git a/scripts/performance/benchmark-rm b/scripts/performance/benchmark-rm deleted file mode 100755 index 16009c696cda..000000000000 --- a/scripts/performance/benchmark-rm +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python -from benchmark_utils import benchmark_command, get_transfer_command -from benchmark_utils import backup, copy, clean, get_default_argparser - - -def benchmark_rm(args): - command = get_transfer_command( - 'rm %s' % args.target, args.recursive, args.quiet) - backup_path = backup(args.target, args.recursive) - - benchmark_command( - command, args.benchmark_script, args.summarize_script, - args.result_dir, args.num_iterations, args.dry_run, - upkeep=lambda: copy(backup_path, args.target, args.recursive), - cleanup=lambda: clean(backup_path, args.recursive) - ) - - -if __name__ == "__main__": - parser = get_default_argparser() - parser.add_argument('-t', '--target', required=True, help='An S3 path.') - benchmark_rm(parser.parse_args()) diff --git a/scripts/performance/benchmark_utils.py b/scripts/performance/benchmark_utils.py deleted file mode 100644 index da48ae372d81..000000000000 --- a/scripts/performance/benchmark_utils.py +++ /dev/null @@ -1,251 +0,0 @@ -import s3transfer -import os -import subprocess -import uuid -import shutil -import argparse -import tempfile - - -def summarize(script, result_dir, summary_dir): - """Run the given summary script on every file in the given directory. - - :param script: A summarization script that takes a list of csv files. - :param result_dir: A directory containing csv performance result files. - :param summary_dir: The directory to put the summary file in. - """ - summarize_args = [script] - for f in os.listdir(result_dir): - path = os.path.join(result_dir, f) - if os.path.isfile(path): - summarize_args.append(path) - - with open(os.path.join(summary_dir, 'summary.txt'), 'wb') as f: - subprocess.check_call(summarize_args, stdout=f) - with open(os.path.join(summary_dir, 'summary.json'), 'wb') as f: - summarize_args.extend(['--output-format', 'json']) - subprocess.check_call(summarize_args, stdout=f) - - -def _get_s3transfer_performance_script(script_name): - """Retrieves an s3transfer performance script if available.""" - s3transfer_directory = os.path.dirname(s3transfer.__file__) - s3transfer_directory = os.path.dirname(s3transfer_directory) - scripts_directory = 'scripts/performance' - scripts_directory = os.path.join(s3transfer_directory, scripts_directory) - script = os.path.join(scripts_directory, script_name) - - if os.path.isfile(script): - return script - else: - return None - - -def get_benchmark_script(): - return _get_s3transfer_performance_script('benchmark') - - -def get_summarize_script(): - return _get_s3transfer_performance_script('summarize') - - -def backup(source, recursive): - """Backup a given source to a temporary location. - - :type source: str - :param source: A local path or s3 path to backup. - - :type recursive: bool - :param recursive: if True, the source will be treated as a directory. - """ - if source[:5] == 's3://': - parts = source.split('/') - parts.insert(3, str(uuid.uuid4())) - backup_path = '/'.join(parts) - else: - name = os.path.split(source)[-1] - temp_dir = tempfile.mkdtemp() - backup_path = os.path.join(temp_dir, name) - - copy(source, backup_path, recursive) - return backup_path - - -def copy(source, destination, recursive): - """Copy files from one location to another. - - The source and destination must both be s3 paths or both be local paths. - - :type source: str - :param source: A local path or s3 path to backup. - - :type destination: str - :param destination: A local path or s3 path to backup the source to. - - :type recursive: bool - :param recursive: if True, the source will be treated as a directory. - """ - if 's3://' in [source[:5], destination[:5]]: - cp_args = ['aws', 's3', 'cp', source, destination, '--quiet'] - if recursive: - cp_args.append('--recursive') - subprocess.check_call(cp_args) - return - - if recursive: - shutil.copytree(source, destination) - else: - shutil.copy(source, destination) - - -def clean(destination, recursive): - """Delete a file or directory either locally or on S3.""" - if destination[:5] == 's3://': - rm_args = ['aws', 's3', 'rm', '--quiet', destination] - if recursive: - rm_args.append('--recursive') - subprocess.check_call(rm_args) - else: - if recursive: - shutil.rmtree(destination) - else: - os.remove(destination) - - -def create_random_subfolder(destination): - """Create a random subdirectory in a given directory.""" - folder_name = str(uuid.uuid4()) - if destination.startswith('s3://'): - parts = destination.split('/') - parts.append(folder_name) - return '/'.join(parts) - else: - parts = list(os.path.split(destination)) - parts.append(folder_name) - path = os.path.join(*parts) - os.makedirs(path) - return path - - -def get_transfer_command(command, recursive, quiet): - """Get a full cli transfer command. - - Performs common transformations, e.g. adding --quiet - """ - cli_command = 'aws s3 ' + command - - if recursive: - cli_command += ' --recursive' - - if quiet: - cli_command += ' --quiet' - else: - print(cli_command) - - return cli_command - - -def benchmark_command(command, benchmark_script, summarize_script, - output_dir, num_iterations, dry_run, upkeep=None, - cleanup=None): - """Benchmark several runs of a long-running command. - - :type command: str - :param command: The full aws cli command to benchmark - - :type benchmark_script: str - :param benchmark_script: A benchmark script that takes a command to run - and outputs performance data to a file. This should be from s3transfer. - - :type summarize_script: str - :param summarize_script: A summarization script that the output of the - benchmark script. This should be from s3transfer. - - :type output_dir: str - :param output_dir: The directory to output performance results to. - - :type num_iterations: int - :param num_iterations: The number of times to run the benchmark on the - command. - - :type dry_run: bool - :param dry_run: Whether or not to actually run the benchmarks. - - :type upkeep: function that takes no arguments - :param upkeep: A function that is run after every iteration of the - benchmark process. This should be used for upkeep, such as restoring - files that were deleted as part of the command executing. - - :type cleanup: function that takes no arguments - :param cleanup: A function that is run at the end of the benchmark - process or if there are any problems during the benchmark process. - It should be uses for the final cleanup, such as deleting files that - were created at some destination. - """ - performance_dir = os.path.join(output_dir, 'performance') - if os.path.exists(performance_dir): - shutil.rmtree(performance_dir) - os.makedirs(performance_dir) - - try: - for i in range(num_iterations): - out_file = 'performance%s.csv' % i - out_file = os.path.join(performance_dir, out_file) - benchmark_args = [ - benchmark_script, command, '--output-file', out_file - ] - if not dry_run: - subprocess.check_call(benchmark_args) - if upkeep is not None: - upkeep() - - if not dry_run: - summarize(summarize_script, performance_dir, output_dir) - finally: - if not dry_run and cleanup is not None: - cleanup() - - -def get_default_argparser(): - """Get an ArgumentParser with all the base benchmark arguments added in.""" - parser = argparse.ArgumentParser() - parser.add_argument( - '--no-cleanup', action='store_true', default=False, - help='Do not remove the destination after the tests complete.' - ) - parser.add_argument( - '--recursive', action='store_true', default=False, - help='Indicates that this is a recursive transfer.' - ) - benchmark_script = get_benchmark_script() - parser.add_argument( - '--benchmark-script', default=benchmark_script, - required=benchmark_script is None, - help=('The benchmark script to run the commands with. This should be ' - 'from s3transfer.') - ) - summarize_script = get_summarize_script() - parser.add_argument( - '--summarize-script', default=summarize_script, - required=summarize_script is None, - help=('The summarize script to run the commands with. This should be ' - 'from s3transfer.') - ) - parser.add_argument( - '-o', '--result-dir', default='results', - help='The directory to output performance results to. Existing ' - 'results will be deleted.' - ) - parser.add_argument( - '--dry-run', default=False, action='store_true', - help='If set, commands will only be printed out, not executed.' - ) - parser.add_argument( - '--quiet', default=False, action='store_true', - help='If set, output is suppressed.' - ) - parser.add_argument( - '-n', '--num-iterations', default=1, type=int, - help='The number of times to run the test.' - ) - return parser diff --git a/scripts/performance/assets/benchmarks.json b/scripts/performance/benchmarks.json similarity index 100% rename from scripts/performance/assets/benchmarks.json rename to scripts/performance/benchmarks.json diff --git a/scripts/performance/perfcmp b/scripts/performance/perfcmp deleted file mode 100755 index d2b2c8378e87..000000000000 --- a/scripts/performance/perfcmp +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python -"""Compare 2 perf runs. - -To use, specify the local directories that contain -the run information:: - - $ ./perfcmp /results/2016-01-01-1111/ /results/2016-01-01-2222/ - -""" -import os -import json -import argparse - -from colorama import Fore, Style -from tabulate import tabulate - - -class RunComparison(object): - - MEMORY_FIELDS = ['average_memory', 'max_memory'] - TIME_FIELDS = ['total_time'] - # Fields that aren't memory or time fields, they require - # no special formatting. - OTHER_FIELDS = ['average_cpu'] - - def __init__(self, old_summary, new_summary): - self.old_summary = old_summary - self.new_summary = new_summary - - def iter_field_names(self): - for field in self.TIME_FIELDS + self.MEMORY_FIELDS + self.OTHER_FIELDS: - yield field - - def old(self, field): - value = self.old_summary[field] - return self._format(field, value) - - def old_suffix(self, field): - value = self.old_summary[field] - return self._format_suffix(field, value) - - def new_suffix(self, field): - value = self.new_summary[field] - return self._format_suffix(field, value) - - def _format_suffix(self, field, value): - if field in self.TIME_FIELDS: - return 'sec' - elif field in self.OTHER_FIELDS: - return '' - else: - # The suffix depends on the actual value. - return self._human_readable_size(value)[1] - - def old_stddev(self, field): - real_field = 'std_dev_%s' % field - return self.old(real_field) - - def new(self, field): - value = self.new_summary[field] - return self._format(field, value) - - def new_stddev(self, field): - real_field = 'std_dev_%s' % field - return self.new(real_field) - - def _format(self, field, value): - if field.startswith('std_dev_'): - field = field[len('std_dev_'):] - if field in self.MEMORY_FIELDS: - return self._human_readable_size(value)[0] - elif field in self.TIME_FIELDS: - return '%-3.2f' % value - else: - return '%.2f' % value - - def _human_readable_size(self, value): - hummanize_suffixes = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB') - base = 1024 - bytes_int = float(value) - - if bytes_int == 1: - return '1 Byte' - elif bytes_int < base: - return '%d Bytes' % bytes_int - - for i, suffix in enumerate(hummanize_suffixes): - unit = base ** (i+2) - if round((bytes_int / unit) * base) < base: - return ['%.2f' % (base * bytes_int / unit), suffix] - - def diff_percent(self, field): - diff_percent = ( - (self.new_summary[field] - self.old_summary[field]) / - float(self.old_summary[field])) * 100 - return diff_percent - - -def compare_runs(old_dir, new_dir): - for dirname in os.listdir(old_dir): - old_run_dir = os.path.join(old_dir, dirname) - new_run_dir = os.path.join(new_dir, dirname) - if not os.path.isdir(old_run_dir): - continue - old_summary = get_summary(old_run_dir) - new_summary = get_summary(new_run_dir) - comp = RunComparison(old_summary, new_summary) - header = [Style.BRIGHT + dirname + Style.RESET_ALL, - Style.BRIGHT + 'old' + Style.RESET_ALL, - # Numeric suffix (MiB, GiB, sec). - '', - 'std_dev', - Style.BRIGHT + 'new' + Style.RESET_ALL, - # Numeric suffix (MiB, GiB, sec). - '', - 'std_dev', - Style.BRIGHT + 'delta' + Style.RESET_ALL] - rows = [] - for field in comp.iter_field_names(): - row = [field, comp.old(field), comp.old_suffix(field), - comp.old_stddev(field), comp.new(field), - comp.new_suffix(field), comp.new_stddev(field)] - diff_percent = comp.diff_percent(field) - diff_percent_str = '%.2f%%' % diff_percent - if diff_percent < 0: - diff_percent_str = ( - Fore.GREEN + diff_percent_str + Style.RESET_ALL) - else: - diff_percent_str = ( - Fore.RED + diff_percent_str + Style.RESET_ALL) - row.append(diff_percent_str) - rows.append(row) - print(tabulate(rows, headers=header, tablefmt='plain')) - print('') - - -def get_summary(benchmark_dir): - summary_json = os.path.join(benchmark_dir, 'summary.json') - with open(summary_json) as f: - return json.load(f) - - -def main(): - parser = argparse.ArgumentParser(description='__doc__') - parser.add_argument('oldrunid', help='Path to old run idir') - parser.add_argument('newrunid', help='Local to new run dir') - args = parser.parse_args() - compare_runs(args.oldrunid, args.newrunid) - - -if __name__ == '__main__': - main() diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks index 3d645af6f73f..0c3f0ab2bfe9 100644 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -15,7 +15,6 @@ from awscli.compat import BytesIO _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), - 'assets', 'benchmarks.json' ) From 5b7fc0ee409d8546f5b431037f4d9a71b6ec93f2 Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 28 Jan 2025 15:59:20 -0500 Subject: [PATCH 4/9] Move default config file to constants --- scripts/performance/run-benchmarks | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks index 0c3f0ab2bfe9..54c263b476a6 100644 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -17,6 +17,7 @@ _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'benchmarks.json' ) +_DEFAULT_FILE_CONFIG_CONTENTS = "[default]" class RawResponse(BytesIO): @@ -151,12 +152,6 @@ def _get_default_env(config_file): } -def _default_config_file_contents(): - return ( - '[default]' - ) - - def _setup_environment(env, result_dir, config_file): """ Creates all files / directories defined in the env struct. @@ -176,7 +171,7 @@ def _setup_environment(env, result_dir, config_file): file_dir_def['file_size'] ) with open(config_file, 'w') as f: - f.write(env.get('config', _default_config_file_contents())) + f.write(env.get('config', _DEFAULT_FILE_CONFIG_CONTENTS)) f.flush() From 865a3a3a034aa1d40057e78d81b3e780125e53b6 Mon Sep 17 00:00:00 2001 From: aemous Date: Wed, 22 Jan 2025 15:16:42 -0500 Subject: [PATCH 5/9] Updates on refactoring run-benchmarks. --- scripts/performance/run-benchmarks | 506 ++++++++++++++++------------- 1 file changed, 276 insertions(+), 230 deletions(-) mode change 100644 => 100755 scripts/performance/run-benchmarks diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks old mode 100644 new mode 100755 index 54c263b476a6..cd2736f5d308 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -6,13 +6,13 @@ import psutil import shutil import subprocess import time -from unittest import mock - - import awscli.botocore.awsrequest -from awscli.clidriver import AWSCLIEntryPoint + +from unittest import mock +from awscli.clidriver import AWSCLIEntryPoint, create_clidriver from awscli.compat import BytesIO + _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'benchmarks.json' @@ -20,6 +20,59 @@ _BENCHMARK_DEFINITIONS = os.path.join( _DEFAULT_FILE_CONFIG_CONTENTS = "[default]" +def _create_file_with_size(path, size): + """ + Creates a full-access file in the given directory with the + specified name and size. + """ + f = open(path, 'wb') + os.chmod(path, 0o777) + size = int(size) + f.truncate(size) + f.close() + + +def _create_file_dir(dir_path, file_count, size): + """ + Creates a directory with the specified name. Also creates identical files + with the given size in the created directory. The number of identical files + to be created is specified by file_count. + """ + os.mkdir(dir_path, 0o777) + for i in range(int(file_count)): + file_path = os.path.join(dir_path, f'{i}') + _create_file_with_size(file_path, size) + + +def _overwrite_dir_full_access(directory): + if os.path.exists(directory): + shutil.rmtree(directory) + os.makedirs(directory, 0o777) + + +def _reset_dir(directory): + """ + Deletes everything in the given folder, and recreates it anew. + """ + shutil.rmtree(directory, ignore_errors=True) + _overwrite_dir_full_access(directory) + + +def _get_default_env(config_file): + return { + 'AWS_CONFIG_FILE': config_file, + 'AWS_DEFAULT_REGION': 'us-west-2', + 'AWS_ACCESS_KEY_ID': 'access_key', + 'AWS_SECRET_ACCESS_KEY': 'secret_key' + } + + +def _default_config_file_contents(): + return ( + '[default]' + ) + + class RawResponse(BytesIO): """ A bytes-like streamable HTTP response representation. @@ -87,245 +140,238 @@ class ProcessBenchmarker(object): def _run_benchmark(self, pid, output_file, data_interval): process_to_measure = psutil.Process(pid) - - with open(output_file, 'w') as f: - while process_to_measure.is_running(): - if process_to_measure.status() == psutil.STATUS_ZOMBIE: - process_to_measure.kill() - break - time.sleep(data_interval) - try: - # Collect the memory and cpu usage. - memory_used = process_to_measure.memory_info().rss - cpu_percent = process_to_measure.cpu_percent() - except (psutil.AccessDenied, psutil.ZombieProcess): - # Trying to get process information from a closed or - # zombie process will result in corresponding exceptions. - break - - # Determine the lapsed time for bookkeeping - current_time = time.time() - - # Save all the data into a CSV file. - f.write( - f"{current_time},{memory_used},{cpu_percent}\n" - ) - f.flush() - - -def _create_file_with_size(path, size): - """ - Creates a full-access file in the given directory with the - specified name and size. - """ - f = open(path, 'wb') - os.chmod(path, 0o777) - size = int(size) - f.truncate(size) - f.close() - - -def _create_file_dir(dir_path, file_count, size): - """ - Creates a directory with the specified name. Also creates identical files - with the given size in the created directory. The number of identical files - to be created is specified by file_count. - """ - os.mkdir(dir_path, 0o777) - for i in range(int(file_count)): - file_path = os.path.join(dir_path, f'{i}') - _create_file_with_size(file_path, size) - - -def _overwrite_dir_full_access(directory): - if os.path.exists(directory): - shutil.rmtree(directory) - os.makedirs(directory, 0o777) - - -def _get_default_env(config_file): - return { - 'AWS_CONFIG_FILE': config_file, - 'AWS_DEFAULT_REGION': 'us-west-2', - 'AWS_ACCESS_KEY_ID': 'access_key', - 'AWS_SECRET_ACCESS_KEY': 'secret_key' - } - - -def _setup_environment(env, result_dir, config_file): - """ - Creates all files / directories defined in the env struct. - Also, writes a config file named 'config' to the result directory - with contents optionally specified by the env struct. - """ - if "files" in env: - for file_def in env['files']: - path = os.path.join(result_dir, file_def['name']) - _create_file_with_size(path, file_def['size']) - if "file_dirs" in env: - for file_dir_def in env['file_dirs']: - dir_path = os.path.join(result_dir, file_dir_def['name']) - _create_file_dir( - dir_path, - file_dir_def['file_count'], - file_dir_def['file_size'] + output_f = open(output_file, 'w') + + while process_to_measure.is_running(): + if process_to_measure.status() == psutil.STATUS_ZOMBIE: + process_to_measure.kill() + break + time.sleep(data_interval) + try: + # Collect the memory and cpu usage. + memory_used = process_to_measure.memory_info().rss + cpu_percent = process_to_measure.cpu_percent() + except (psutil.AccessDenied, psutil.ZombieProcess): + # Trying to get process information from a closed or + # zombie process will result in corresponding exceptions. + break + + # Determine the lapsed time for bookkeeping + current_time = time.time() + + # Save all the data into a CSV file. + output_f.write( + f"{current_time},{memory_used},{cpu_percent}\n" ) - with open(config_file, 'w') as f: - f.write(env.get('config', _DEFAULT_FILE_CONFIG_CONTENTS)) - f.flush() - - -def _setup_iteration( - benchmark, - client, - result_dir, - performance_dir, - config_file -): - """ - Performs the setup for a single iteration of a benchmark. This - includes creating the files used by a command and stubbing - the HTTP client to use during execution. - """ - env = benchmark.get('environment', {}) - _setup_environment(env, result_dir, config_file) - _overwrite_dir_full_access(performance_dir) - client.setup() - _stub_responses( - benchmark.get('responses', [{"headers": {}, "body": ""}]), - client - ) + output_f.flush() -def _reset_dir(dir): +class BenchmarkHarness(object): """ - Deletes everything in the given folder, and recreates it anew. + Orchestrates running benchmarks in isolated, configurable environments defined + via a specified JSON file. """ - shutil.rmtree(dir, ignore_errors=True) - _overwrite_dir_full_access(dir) + def _setup_environment(self, env, result_dir, config_file): + """ + Creates all files / directories defined in the env struct. + Also, writes a config file named 'config' to the result directory + with contents optionally specified by the env struct. + """ + if "files" in env: + for file_def in env['files']: + path = os.path.join(result_dir, file_def['name']) + _create_file_with_size(path, file_def['size']) + if "file_dirs" in env: + for file_dir_def in env['file_dirs']: + dir_path = os.path.join(result_dir, file_dir_def['name']) + _create_file_dir( + dir_path, + file_dir_def['file_count'], + file_dir_def['file_size'] + ) + with open(config_file, 'w') as f: + f.write(env.get('config', _default_config_file_contents())) + f.flush() + + + def _setup_iteration( + self, + benchmark, + client, + result_dir, + performance_dir, + config_file + ): + """ + Performs the setup for a single iteration of a benchmark. This + includes creating the files used by a command and stubbing + the HTTP client to use during execution. + """ + env = benchmark.get('environment', {}) + self._setup_environment(env, result_dir, config_file) + _overwrite_dir_full_access(performance_dir) + client.setup() + self._stub_responses( + benchmark.get('responses', [{"headers": {}, "body": ""}]), + client + ) -def _stub_responses(responses, client): - """ - Stubs the supplied HTTP client using the response instructions in the supplied - responses struct. Each instruction will generate one or more stubbed responses. - """ - for response in responses: - body = response.get("body", "") - headers = response.get("headers", {}) - status_code = response.get("status_code", 200) - # use the instances key to support duplicating responses a configured number of times - if "instances" in response: - for _ in range(int(response['instances'])): + def _stub_responses(self, responses, client): + """ + Stubs the supplied HTTP client using the response instructions in the supplied + responses struct. Each instruction will generate one or more stubbed responses. + """ + for response in responses: + body = response.get("body", "") + headers = response.get("headers", {}) + status_code = response.get("status_code", 200) + # use the instances key to support duplicating responses a configured number of times + if "instances" in response: + for _ in range(int(response['instances'])): + client.add_response(body, headers, status_code) + else: client.add_response(body, headers, status_code) - else: - client.add_response(body, headers, status_code) -def _process_measurements(benchmark_result): - """ - Perform post-processing of the output of S3Transfer's summarize script, - such as removing unneeded entries. - """ - del benchmark_result['std_dev_total_time'] - del benchmark_result['std_dev_max_memory'] - del benchmark_result['std_dev_average_memory'] - del benchmark_result['std_dev_average_cpu'] - - -def _run_isolated_benchmark( - result_dir, - performance_dir, - benchmark, - client, - process_benchmarker, - args -): - """ - Runs a single iteration of one benchmark execution. Includes setting up - the environment, running the benchmarked execution, formatting - the results, and cleaning up the environment. - """ - out_file = os.path.join(performance_dir, 'performance.csv') - assets_dir = os.path.join(result_dir, 'assets') - config_file = os.path.join(assets_dir, 'config') - # setup for iteration of benchmark - _setup_iteration(benchmark, client, result_dir, performance_dir, config_file) - os.chdir(result_dir) - # patch the OS environment with our supplied defaults - env_patch = mock.patch.dict('os.environ', _get_default_env(config_file)) - env_patch.start() - # fork a child process to run the command on. - # the parent process benchmarks the child process until the child terminates. - pid = os.fork() - - try: - # execute command on child process - if pid == 0: - AWSCLIEntryPoint().main(benchmark['command']) - os._exit(0) - # benchmark child process from parent process until child terminates - process_benchmarker.benchmark_process( - pid, - out_file, - args.data_interval - ) - # summarize benchmark results & process summary - summary = json.loads(subprocess.check_output( - [args.summarize_script, out_file, '--output-format', 'json'] - )) - finally: - # cleanup iteration of benchmark - client.tearDown() - _reset_dir(result_dir) - _reset_dir(assets_dir) - env_patch.stop() - return summary - - -def run_benchmarks(args): - """ - Orchestrates benchmarking via the benchmark definitions in - the arguments. - """ - summaries = {'results': []} - result_dir = args.result_dir - assets_dir = os.path.join(result_dir, 'assets') - performance_dir = os.path.join(result_dir, 'performance') - client = StubbedHTTPClient() - process_benchmarker = ProcessBenchmarker() - _overwrite_dir_full_access(result_dir) - _overwrite_dir_full_access(assets_dir) - - with open(args.benchmark_definitions, 'r') as file: - definitions = json.load(file) - - try: - for benchmark in definitions: - benchmark_result = { - 'name': benchmark['name'], - 'dimensions': benchmark['dimensions'], - 'measurements': [] - } - for _ in range(args.num_iterations): - measurements = _run_isolated_benchmark( - result_dir, - performance_dir, - benchmark, - client, - process_benchmarker, - args + def _process_measurements(self, benchmark_result): + """ + Perform post-processing of the output of S3Transfer's summarize script, + such as removing unneeded entries. + """ + del benchmark_result['std_dev_total_time'] + del benchmark_result['std_dev_max_memory'] + del benchmark_result['std_dev_average_memory'] + del benchmark_result['std_dev_max_cpu'] + del benchmark_result['std_dev_average_cpu'] + + + def _run_isolated_benchmark( + self, + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args + ): + """ + Runs a single iteration of one benchmark execution. Includes setting up + the environment, running the benchmarked execution, formatting + the results, and cleaning up the environment. + """ + out_file = os.path.join(performance_dir, 'performance.csv') + assets_dir = os.path.join(result_dir, 'assets') + config_file = os.path.join(assets_dir, 'config') + # setup for iteration of benchmark + self._setup_iteration(benchmark, client, result_dir, performance_dir, config_file) + os.chdir(result_dir) + # patch the OS environment with our supplied defaults + env_patch = mock.patch.dict('os.environ', _get_default_env(config_file)) + env_patch.start() + # fork a child process to run the command on. + # the parent process benchmarks the child process until the child terminates. + pid = os.fork() + + try: + # execute command on child process + if pid == 0: + # TODO refactor to helper function + first_client_invocation_time = None + start_time = time.time() + driver = create_clidriver() + event_emitter = driver.session.get_component('event_emitter') + def _log_invocation_time(params, request_signer, model, **kwargs): + nonlocal first_client_invocation_time + if first_client_invocation_time is None: + first_client_invocation_time = time.time() + + event_emitter.register_last( + 'before-call', + _log_invocation_time, + 'benchmarks.log-invocation-time' ) - _process_measurements(measurements) - benchmark_result['measurements'].append(measurements) - summaries['results'].append(benchmark_result) - finally: - # final cleanup - shutil.rmtree(result_dir, ignore_errors=True) - print(summaries) + AWSCLIEntryPoint(driver).main(benchmark['command']) + end_time = time.time() + + # write the collected metrics to a file + metrics_f = open(os.path.join(result_dir, 'metrics.json'), 'w') + metrics_f.write(json.dumps( + { + 'start_time': start_time, + 'end_time': end_time, + 'first_client_invocation_time': first_client_invocation_time + } + )) + metrics_f.close() + os._exit(0) + # benchmark child process from parent process until child terminates + process_benchmarker.benchmark_process( + pid, + out_file, + args.data_interval + ) + # summarize benchmark results & process summary + summary = json.loads(subprocess.check_output( + [args.summarize_script, out_file, '--output-format', 'json'] + )) + # load the internally-collected metrics and append to the summary + metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) + summary['total_time'] = metrics_f['end_time'] - metrics_f['start_time'] + summary['first_client_invocation_time'] = (metrics_f['first_client_invocation_time'] + - metrics_f['start_time']) + finally: + # cleanup iteration of benchmark + client.tearDown() + _reset_dir(result_dir) + _reset_dir(assets_dir) + env_patch.stop() + self._time_of_call = None + return summary + + + def run_benchmarks(self, args): + """ + Orchestrates benchmarking via the benchmark definitions in + the arguments. + """ + summaries = {'results': []} + result_dir = args.result_dir + assets_dir = os.path.join(result_dir, 'assets') + performance_dir = os.path.join(result_dir, 'performance') + client = StubbedHTTPClient() + process_benchmarker = ProcessBenchmarker() + definitions = json.load(open(args.benchmark_definitions, 'r')) + _overwrite_dir_full_access(result_dir) + _overwrite_dir_full_access(assets_dir) + + try: + for benchmark in definitions: + benchmark_result = { + 'name': benchmark['name'], + 'dimensions': benchmark['dimensions'], + 'measurements': [] + } + for _ in range(args.num_iterations): + measurements = self._run_isolated_benchmark( + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args + ) + self._process_measurements(measurements) + benchmark_result['measurements'].append(measurements) + summaries['results'].append(benchmark_result) + finally: + # final cleanup + shutil.rmtree(result_dir, ignore_errors=True) + print(summaries) if __name__ == "__main__": + harness = BenchmarkHarness() parser = argparse.ArgumentParser() parser.add_argument( '--benchmark-definitions', default=_BENCHMARK_DEFINITIONS, @@ -354,4 +400,4 @@ if __name__ == "__main__": type=int, help='The number of iterations to repeat the benchmark for.', ) - run_benchmarks(parser.parse_args()) \ No newline at end of file + harness.run_benchmarks(parser.parse_args()) From 40d02dafcc0a2d2aa1b8270f0ba45d3ef03f1abc Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 28 Jan 2025 10:24:56 -0500 Subject: [PATCH 6/9] Move Summarizer external s3transfer script into benchmark_utils. --- scripts/performance/benchmark_utils.py | 103 +++++++++++++++++++++++++ scripts/performance/benchmarks.json | 12 +-- scripts/performance/run-benchmarks | 31 ++------ 3 files changed, 115 insertions(+), 31 deletions(-) create mode 100644 scripts/performance/benchmark_utils.py diff --git a/scripts/performance/benchmark_utils.py b/scripts/performance/benchmark_utils.py new file mode 100644 index 000000000000..c9b147ba25ac --- /dev/null +++ b/scripts/performance/benchmark_utils.py @@ -0,0 +1,103 @@ +import csv +import math + +import s3transfer +import os +import subprocess +import uuid +import shutil +import argparse +import tempfile + + +class Summarizer: + DATA_INDEX_IN_ROW = {'time': 0, 'memory': 1, 'cpu': 2} + + def __init__(self): + self._num_rows = 0 + self._start_time = None + self._end_time = None + self._averages = { + 'memory': 0.0, + 'cpu': 0.0, + } + self._samples = { + 'memory': [], + 'cpu': [], + } + self._maximums = {'memory': 0.0, 'cpu': 0.0} + + def summarize(self, benchmark_file): + """Processes the data from the CSV file""" + with open(benchmark_file) as f: + reader = csv.reader(f) + # Process each row from the CSV file + row = None + for row in reader: + self._validate_row(row, benchmark_file) + self.process_data_row(row) + self._validate_row(row, benchmark_file) + self._end_time = self._get_time(row) + metrics = self._finalize_processed_data_for_file() + return metrics + + def _validate_row(self, row, filename): + if not row: + raise RuntimeError( + f'Row: {row} could not be processed. The CSV file ({filename}) may be ' + 'empty.' + ) + + def process_data_row(self, row): + # If the row is the first row collect the start time. + if self._num_rows == 0: + self._start_time = self._get_time(row) + self._num_rows += 1 + self.process_data_point(row, 'memory') + self.process_data_point(row, 'cpu') + + def process_data_point(self, row, name): + # Determine where in the CSV row the requested data is located. + index = self.DATA_INDEX_IN_ROW[name] + # Get the data point. + data_point = float(row[index]) + self._add_to_average(name, data_point) + self._account_for_maximum(name, data_point) + self._samples[name].append(data_point) + + def _finalize_processed_data_for_file(self): + self._samples['memory'].sort() + self._samples['cpu'].sort() + metrics = { + 'time': self._end_time - self._start_time, + 'average_memory': self._averages['memory'] / self._num_rows, + 'average_cpu': self._averages['cpu'] / self._num_rows, + 'max_memory': self._maximums['memory'], + 'max_cpu': self._maximums['cpu'], + 'memory_p50': self._compute_metric_percentile(50, 'memory'), + 'memory_p95': self._compute_metric_percentile(95, 'memory'), + 'cpu_p50': self._compute_metric_percentile(50, 'cpu'), + 'cpu_p95': self._compute_metric_percentile(95, 'cpu'), + } + # Reset some of the data needed to be tracked for each execution + self._num_rows = 0 + self._maximums = self._maximums.fromkeys(self._maximums, 0.0) + self._averages = self._averages.fromkeys(self._averages, 0.0) + self._samples['memory'].clear() + self._samples['cpu'].clear() + return metrics + + def _compute_metric_percentile(self, percentile, name): + num_samples = len(self._samples[name]) + p_idx = math.ceil(percentile*num_samples/100) - 1 + return self._samples[name][p_idx] + + def _get_time(self, row): + return float(row[self.DATA_INDEX_IN_ROW['time']]) + + def _add_to_average(self, name, data_point): + self._averages[name] += data_point + + def _account_for_maximum(self, name, data_point): + if data_point > self._maximums[name]: + self._maximums[name] = data_point diff --git a/scripts/performance/benchmarks.json b/scripts/performance/benchmarks.json index c6c0efee809a..e23afb2a7dd4 100644 --- a/scripts/performance/benchmarks.json +++ b/scripts/performance/benchmarks.json @@ -111,7 +111,7 @@ "name": "s3.sync.upload", "command": ["s3", "sync", "./test_dir", "s3://bucket/test_dir", "--quiet"], "dimensions": [ - {"FileCount": "50,000"}, + {"FileCount": "5,000"}, {"FileSize": "4KB"}, {"S3TransferClient": "Classic"} ], @@ -119,7 +119,7 @@ "file_dirs": [ { "name": "test_dir", - "file_count": 5e4, + "file_count": 5e3, "file_size": 4e3 } ], @@ -131,7 +131,7 @@ }, { "headers": {"ETag": "etag"}, - "instances": 5e4 + "instances": 5e3 } ] }, @@ -139,7 +139,7 @@ "name": "s3.sync.upload", "command": ["s3", "sync", "./test_dir", "s3://bucket/test_dir", "--quiet"], "dimensions": [ - {"FileCount": "50,000"}, + {"FileCount": "5,000"}, {"FileSize": "4KB"}, {"S3TransferClient": "CRT"} ], @@ -147,7 +147,7 @@ "file_dirs": [ { "name": "test_dir", - "file_count": 5e4, + "file_count": 5e3, "file_size": 4e3 } ], @@ -159,7 +159,7 @@ }, { "headers": {"ETag": "etag"}, - "instances": 5e4 + "instances": 5e3 } ] } diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks index cd2736f5d308..843253febd6a 100755 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -11,7 +11,7 @@ import awscli.botocore.awsrequest from unittest import mock from awscli.clidriver import AWSCLIEntryPoint, create_clidriver from awscli.compat import BytesIO - +from scripts.performance.benchmark_utils import Summarizer _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -171,6 +171,8 @@ class BenchmarkHarness(object): Orchestrates running benchmarks in isolated, configurable environments defined via a specified JSON file. """ + def __init__(self): + self._summarizer = Summarizer() def _setup_environment(self, env, result_dir, config_file): """ Creates all files / directories defined in the env struct. @@ -234,18 +236,6 @@ class BenchmarkHarness(object): client.add_response(body, headers, status_code) - def _process_measurements(self, benchmark_result): - """ - Perform post-processing of the output of S3Transfer's summarize script, - such as removing unneeded entries. - """ - del benchmark_result['std_dev_total_time'] - del benchmark_result['std_dev_max_memory'] - del benchmark_result['std_dev_average_memory'] - del benchmark_result['std_dev_max_cpu'] - del benchmark_result['std_dev_average_cpu'] - - def _run_isolated_benchmark( self, result_dir, @@ -312,9 +302,7 @@ class BenchmarkHarness(object): args.data_interval ) # summarize benchmark results & process summary - summary = json.loads(subprocess.check_output( - [args.summarize_script, out_file, '--output-format', 'json'] - )) + summary = self._summarizer.summarize(out_file) # load the internally-collected metrics and append to the summary metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) summary['total_time'] = metrics_f['end_time'] - metrics_f['start_time'] @@ -361,13 +349,12 @@ class BenchmarkHarness(object): process_benchmarker, args ) - self._process_measurements(measurements) benchmark_result['measurements'].append(measurements) summaries['results'].append(benchmark_result) finally: # final cleanup shutil.rmtree(result_dir, ignore_errors=True) - print(summaries) + print(json.dumps(summaries, indent=2)) if __name__ == "__main__": @@ -377,12 +364,6 @@ if __name__ == "__main__": '--benchmark-definitions', default=_BENCHMARK_DEFINITIONS, help=('The JSON file defining the commands to benchmark.') ) - parser.add_argument( - '--summarize-script', - required=True, - help=('The summarize script to run the commands with. This should be ' - 'from s3transfer.') - ) parser.add_argument( '-o', '--result-dir', default='results', help='The directory to output performance results to. Existing ' @@ -390,7 +371,7 @@ if __name__ == "__main__": ) parser.add_argument( '--data-interval', - default=0.01, + default=0.001, type=float, help='The interval in seconds to poll for data points.', ) From a46a60a3f06146fb91c422a961252f5df8a2a5e0 Mon Sep 17 00:00:00 2001 From: aemous Date: Tue, 28 Jan 2025 10:55:41 -0500 Subject: [PATCH 7/9] Refactor classes from script to Python module. --- scripts/performance/benchmark_utils.py | 586 +++++++++++++++++++++++++ scripts/performance/run-benchmarks | 349 +-------------- 2 files changed, 588 insertions(+), 347 deletions(-) diff --git a/scripts/performance/benchmark_utils.py b/scripts/performance/benchmark_utils.py index c9b147ba25ac..d5f70e1c72bf 100644 --- a/scripts/performance/benchmark_utils.py +++ b/scripts/performance/benchmark_utils.py @@ -1,5 +1,8 @@ import csv +import json import math +import time +import psutil import s3transfer import os @@ -8,6 +11,11 @@ import shutil import argparse import tempfile +from awscli.botocore.awsrequest import AWSResponse + +from unittest import mock +from awscli.clidriver import AWSCLIEntryPoint, create_clidriver +from awscli.compat import BytesIO class Summarizer: @@ -101,3 +109,581 @@ def _add_to_average(self, name, data_point): def _account_for_maximum(self, name, data_point): if data_point > self._maximums[name]: self._maximums[name] = data_point + + +class RawResponse(BytesIO): + """ + A bytes-like streamable HTTP response representation. + """ + def stream(self, **kwargs): + contents = self.read() + while contents: + yield contents + contents = self.read() + + +class StubbedHTTPClient(object): + """ + A generic stubbed HTTP client. + """ + def setup(self): + urllib3_session_send = 'botocore.httpsession.URLLib3Session.send' + self._urllib3_patch = mock.patch(urllib3_session_send) + self._send = self._urllib3_patch.start() + self._send.side_effect = self.get_response + self._responses = [] + + def tearDown(self): + self._urllib3_patch.stop() + + def get_response(self, request): + response = self._responses.pop(0) + if isinstance(response, Exception): + raise response + return response + + def add_response(self, body, headers, status_code): + response = AWSResponse( + url='http://169.254.169.254/', + status_code=status_code, + headers=headers, + raw=RawResponse(body.encode()) + ) + self._responses.append(response) + + +class ProcessBenchmarker(object): + """ + Periodically samples CPU and memory usage of a process given its pid. Writes + all collected samples to a CSV file. + """ + def benchmark_process(self, pid, output_file, data_interval): + parent_pid = os.getpid() + try: + # Benchmark the process where the script is being run. + self._run_benchmark(pid, output_file, data_interval) + except KeyboardInterrupt: + # If there is an interrupt, then try to clean everything up. + proc = psutil.Process(parent_pid) + procs = proc.children(recursive=True) + + for child in procs: + child.terminate() + + gone, alive = psutil.wait_procs(procs, timeout=1) + for child in alive: + child.kill() + raise + + + def _run_benchmark(self, pid, output_file, data_interval): + process_to_measure = psutil.Process(pid) + output_f = open(output_file, 'w') + + while process_to_measure.is_running(): + if process_to_measure.status() == psutil.STATUS_ZOMBIE: + process_to_measure.kill() + break + time.sleep(data_interval) + try: + # Collect the memory and cpu usage. + memory_used = process_to_measure.memory_info().rss + cpu_percent = process_to_measure.cpu_percent() + except (psutil.AccessDenied, psutil.ZombieProcess): + # Trying to get process information from a closed or + # zombie process will result in corresponding exceptions. + break + + # Determine the lapsed time for bookkeeping + current_time = time.time() + + # Save all the data into a CSV file. + output_f.write( + f"{current_time},{memory_used},{cpu_percent}\n" + ) + output_f.flush() + + +class BenchmarkHarness(object): + """ + Orchestrates running benchmarks in isolated, configurable environments defined + via a specified JSON file. + """ + def __init__(self): + self._summarizer = Summarizer() + + def _get_default_env(self, config_file): + return { + 'AWS_CONFIG_FILE': config_file, + 'AWS_DEFAULT_REGION': 'us-west-2', + 'AWS_ACCESS_KEY_ID': 'access_key', + 'AWS_SECRET_ACCESS_KEY': 'secret_key' + } + + def _default_config_file_contents(self): + return ( + '[default]' + ) + + def _create_file_with_size(self, path, size): + """ + Creates a full-access file in the given directory with the + specified name and size. + """ + f = open(path, 'wb') + os.chmod(path, 0o777) + size = int(size) + f.truncate(size) + f.close() + + def _create_file_dir(self, dir_path, file_count, size): + """ + Creates a directory with the specified name. Also creates identical files + with the given size in the created directory. The number of identical files + to be created is specified by file_count. + """ + os.mkdir(dir_path, 0o777) + for i in range(int(file_count)): + file_path = os.path.join(dir_path, f'{i}') + self._create_file_with_size(file_path, size) + + def _setup_environment(self, env, result_dir, config_file): + """ + Creates all files / directories defined in the env struct. + Also, writes a config file named 'config' to the result directory + with contents optionally specified by the env struct. + """ + if "files" in env: + for file_def in env['files']: + path = os.path.join(result_dir, file_def['name']) + self._create_file_with_size(path, file_def['size']) + if "file_dirs" in env: + for file_dir_def in env['file_dirs']: + dir_path = os.path.join(result_dir, file_dir_def['name']) + self._create_file_dir( + dir_path, + file_dir_def['file_count'], + file_dir_def['file_size'] + ) + with open(config_file, 'w') as f: + f.write(env.get('config', self._default_config_file_contents())) + f.flush() + + def _setup_iteration( + self, + benchmark, + client, + result_dir, + performance_dir, + config_file + ): + """ + Performs the setup for a single iteration of a benchmark. This + includes creating the files used by a command and stubbing + the HTTP client to use during execution. + """ + env = benchmark.get('environment', {}) + self._setup_environment(env, result_dir, config_file) + client.setup() + self._stub_responses( + benchmark.get('responses', [{"headers": {}, "body": ""}]), + client + ) + if os.path.exists(performance_dir): + shutil.rmtree(performance_dir) + os.makedirs(performance_dir, 0o777) + + def _stub_responses(self, responses, client): + """ + Stubs the supplied HTTP client using the response instructions in the supplied + responses struct. Each instruction will generate one or more stubbed responses. + """ + for response in responses: + body = response.get("body", "") + headers = response.get("headers", {}) + status_code = response.get("status_code", 200) + # use the instances key to support duplicating responses a configured number of times + if "instances" in response: + for _ in range(int(response['instances'])): + client.add_response(body, headers, status_code) + else: + client.add_response(body, headers, status_code) + + def _run_command_with_metric_hooks(self, cmd, result_dir): + """ + Runs a CLI command and logs CLI-specific metrics to a file. + """ + first_client_invocation_time = None + start_time = time.time() + driver = create_clidriver() + event_emitter = driver.session.get_component('event_emitter') + + def _log_invocation_time(params, request_signer, model, **kwargs): + nonlocal first_client_invocation_time + if first_client_invocation_time is None: + first_client_invocation_time = time.time() + + event_emitter.register_last( + 'before-call', + _log_invocation_time, + 'benchmarks.log-invocation-time' + ) + AWSCLIEntryPoint(driver).main(cmd) + end_time = time.time() + + # write the collected metrics to a file + metrics_f = open(os.path.join(result_dir, 'metrics.json'), 'w') + metrics_f.write(json.dumps( + { + 'start_time': start_time, + 'end_time': end_time, + 'first_client_invocation_time': first_client_invocation_time + } + )) + metrics_f.close() + os._exit(0) + + def _run_isolated_benchmark( + self, + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args + ): + """ + Runs a single iteration of one benchmark execution. Includes setting up + the environment, running the benchmarked execution, formatting + the results, and cleaning up the environment. + """ + out_file = os.path.join(performance_dir, 'performance.csv') + assets_dir = os.path.join(result_dir, 'assets') + config_file = os.path.join(assets_dir, 'config') + # setup for iteration of benchmark + self._setup_iteration(benchmark, client, result_dir, performance_dir, config_file) + os.chdir(result_dir) + # patch the OS environment with our supplied defaults + env_patch = mock.patch.dict('os.environ', self._get_default_env(config_file)) + env_patch.start() + # fork a child process to run the command on. + # the parent process benchmarks the child process until the child terminates. + pid = os.fork() + + try: + # execute command on child process + if pid == 0: + self._run_command_with_metric_hooks(benchmark['command'], result_dir) + + # benchmark child process from parent process until child terminates + process_benchmarker.benchmark_process( + pid, + out_file, + args.data_interval + ) + # summarize benchmark results and process summary + summary = self._summarizer.summarize(out_file) + # load the internally-collected metrics and append to the summary + metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) + # override the summarizer's sample-based timing with the + # wall-clock time measured by the child process + del summary['time'] + summary['total_time'] = metrics_f['end_time'] - metrics_f['start_time'] + summary['first_client_invocation_time'] = (metrics_f['first_client_invocation_time'] + - metrics_f['start_time']) + finally: + # cleanup iteration of benchmark + client.tearDown() + shutil.rmtree(result_dir, ignore_errors=True) + os.makedirs(result_dir, 0o777) + shutil.rmtree(assets_dir, ignore_errors=True) + os.makedirs(assets_dir, 0o777) + env_patch.stop() + self._time_of_call = None + return summary + + def run_benchmarks(self, args): + """ + Orchestrates benchmarking via the benchmark definitions in + the arguments. + """ + summaries = {'results': []} + result_dir = args.result_dir + assets_dir = os.path.join(result_dir, 'assets') + performance_dir = os.path.join(result_dir, 'performance') + client = StubbedHTTPClient() + process_benchmarker = ProcessBenchmarker() + definitions = json.load(open(args.benchmark_definitions, 'r')) + if os.path.exists(result_dir): + shutil.rmtree(result_dir) + os.makedirs(result_dir, 0o777) + if os.path.exists(assets_dir): + shutil.rmtree(assets_dir) + os.makedirs(assets_dir, 0o777) + + try: + for benchmark in definitions: + benchmark_result = { + 'name': benchmark['name'], + 'dimensions': benchmark['dimensions'], + 'measurements': [] + } + for _ in range(args.num_iterations): + measurements = self._run_isolated_benchmark( + result_dir, + performance_dir, + benchmark, + client, + process_benchmarker, + args + ) + benchmark_result['measurements'].append(measurements) + summaries['results'].append(benchmark_result) + finally: + # final cleanup + shutil.rmtree(result_dir, ignore_errors=True) + print(json.dumps(summaries, indent=2)) + + +def summarize(script, result_dir, summary_dir): + """Run the given summary script on every file in the given directory. + + :param script: A summarization script that takes a list of csv files. + :param result_dir: A directory containing csv performance result files. + :param summary_dir: The directory to put the summary file in. + """ + summarize_args = [script] + for f in os.listdir(result_dir): + path = os.path.join(result_dir, f) + if os.path.isfile(path): + summarize_args.append(path) + + with open(os.path.join(summary_dir, 'summary.txt'), 'wb') as f: + subprocess.check_call(summarize_args, stdout=f) + with open(os.path.join(summary_dir, 'summary.json'), 'wb') as f: + summarize_args.extend(['--output-format', 'json']) + subprocess.check_call(summarize_args, stdout=f) + + +def _get_s3transfer_performance_script(script_name): + """Retrieves an s3transfer performance script if available.""" + s3transfer_directory = os.path.dirname(s3transfer.__file__) + s3transfer_directory = os.path.dirname(s3transfer_directory) + scripts_directory = 'scripts/performance' + scripts_directory = os.path.join(s3transfer_directory, scripts_directory) + script = os.path.join(scripts_directory, script_name) + + if os.path.isfile(script): + return script + else: + return None + + +def get_benchmark_script(): + return _get_s3transfer_performance_script('benchmark') + + +def get_summarize_script(): + return _get_s3transfer_performance_script('summarize') + + +def backup(source, recursive): + """Backup a given source to a temporary location. + + :type source: str + :param source: A local path or s3 path to backup. + + :type recursive: bool + :param recursive: if True, the source will be treated as a directory. + """ + if source[:5] == 's3://': + parts = source.split('/') + parts.insert(3, str(uuid.uuid4())) + backup_path = '/'.join(parts) + else: + name = os.path.split(source)[-1] + temp_dir = tempfile.mkdtemp() + backup_path = os.path.join(temp_dir, name) + + copy(source, backup_path, recursive) + return backup_path + + +def copy(source, destination, recursive): + """Copy files from one location to another. + + The source and destination must both be s3 paths or both be local paths. + + :type source: str + :param source: A local path or s3 path to backup. + + :type destination: str + :param destination: A local path or s3 path to backup the source to. + + :type recursive: bool + :param recursive: if True, the source will be treated as a directory. + """ + if 's3://' in [source[:5], destination[:5]]: + cp_args = ['aws', 's3', 'cp', source, destination, '--quiet'] + if recursive: + cp_args.append('--recursive') + subprocess.check_call(cp_args) + return + + if recursive: + shutil.copytree(source, destination) + else: + shutil.copy(source, destination) + + +def clean(destination, recursive): + """Delete a file or directory either locally or on S3.""" + if destination[:5] == 's3://': + rm_args = ['aws', 's3', 'rm', '--quiet', destination] + if recursive: + rm_args.append('--recursive') + subprocess.check_call(rm_args) + else: + if recursive: + shutil.rmtree(destination) + else: + os.remove(destination) + + +def create_random_subfolder(destination): + """Create a random subdirectory in a given directory.""" + folder_name = str(uuid.uuid4()) + if destination.startswith('s3://'): + parts = destination.split('/') + parts.append(folder_name) + return '/'.join(parts) + else: + parts = list(os.path.split(destination)) + parts.append(folder_name) + path = os.path.join(*parts) + os.makedirs(path) + return path + + +def get_transfer_command(command, recursive, quiet): + """Get a full cli transfer command. + + Performs common transformations, e.g. adding --quiet + """ + cli_command = 'aws s3 ' + command + + if recursive: + cli_command += ' --recursive' + + if quiet: + cli_command += ' --quiet' + else: + print(cli_command) + + return cli_command + + +def benchmark_command(command, benchmark_script, summarize_script, + output_dir, num_iterations, dry_run, upkeep=None, + cleanup=None): + """Benchmark several runs of a long-running command. + + :type command: str + :param command: The full aws cli command to benchmark + + :type benchmark_script: str + :param benchmark_script: A benchmark script that takes a command to run + and outputs performance data to a file. This should be from s3transfer. + + :type summarize_script: str + :param summarize_script: A summarization script that the output of the + benchmark script. This should be from s3transfer. + + :type output_dir: str + :param output_dir: The directory to output performance results to. + + :type num_iterations: int + :param num_iterations: The number of times to run the benchmark on the + command. + + :type dry_run: bool + :param dry_run: Whether or not to actually run the benchmarks. + + :type upkeep: function that takes no arguments + :param upkeep: A function that is run after every iteration of the + benchmark process. This should be used for upkeep, such as restoring + files that were deleted as part of the command executing. + + :type cleanup: function that takes no arguments + :param cleanup: A function that is run at the end of the benchmark + process or if there are any problems during the benchmark process. + It should be uses for the final cleanup, such as deleting files that + were created at some destination. + """ + performance_dir = os.path.join(output_dir, 'performance') + if os.path.exists(performance_dir): + shutil.rmtree(performance_dir) + os.makedirs(performance_dir) + + try: + for i in range(num_iterations): + out_file = 'performance%s.csv' % i + out_file = os.path.join(performance_dir, out_file) + benchmark_args = [ + benchmark_script, command, '--output-file', out_file + ] + if not dry_run: + subprocess.check_call(benchmark_args) + if upkeep is not None: + upkeep() + + if not dry_run: + summarize(summarize_script, performance_dir, output_dir) + finally: + if not dry_run and cleanup is not None: + cleanup() + + +def get_default_argparser(): + """Get an ArgumentParser with all the base benchmark arguments added in.""" + parser = argparse.ArgumentParser() + parser.add_argument( + '--no-cleanup', action='store_true', default=False, + help='Do not remove the destination after the tests complete.' + ) + parser.add_argument( + '--recursive', action='store_true', default=False, + help='Indicates that this is a recursive transfer.' + ) + benchmark_script = get_benchmark_script() + parser.add_argument( + '--benchmark-script', default=benchmark_script, + required=benchmark_script is None, + help=('The benchmark script to run the commands with. This should be ' + 'from s3transfer.') + ) + summarize_script = get_summarize_script() + parser.add_argument( + '--summarize-script', default=summarize_script, + required=summarize_script is None, + help=('The summarize script to run the commands with. This should be ' + 'from s3transfer.') + ) + parser.add_argument( + '-o', '--result-dir', default='results', + help='The directory to output performance results to. Existing ' + 'results will be deleted.' + ) + parser.add_argument( + '--dry-run', default=False, action='store_true', + help='If set, commands will only be printed out, not executed.' + ) + parser.add_argument( + '--quiet', default=False, action='store_true', + help='If set, output is suppressed.' + ) + parser.add_argument( + '-n', '--num-iterations', default=1, type=int, + help='The number of times to run the test.' + ) + return parser \ No newline at end of file diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks index 843253febd6a..1adcb422ba33 100755 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -1,17 +1,9 @@ #!/usr/bin/env python import argparse -import json import os -import psutil -import shutil -import subprocess -import time -import awscli.botocore.awsrequest -from unittest import mock -from awscli.clidriver import AWSCLIEntryPoint, create_clidriver -from awscli.compat import BytesIO -from scripts.performance.benchmark_utils import Summarizer +from scripts.performance.benchmark_utils import BenchmarkHarness + _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), @@ -20,343 +12,6 @@ _BENCHMARK_DEFINITIONS = os.path.join( _DEFAULT_FILE_CONFIG_CONTENTS = "[default]" -def _create_file_with_size(path, size): - """ - Creates a full-access file in the given directory with the - specified name and size. - """ - f = open(path, 'wb') - os.chmod(path, 0o777) - size = int(size) - f.truncate(size) - f.close() - - -def _create_file_dir(dir_path, file_count, size): - """ - Creates a directory with the specified name. Also creates identical files - with the given size in the created directory. The number of identical files - to be created is specified by file_count. - """ - os.mkdir(dir_path, 0o777) - for i in range(int(file_count)): - file_path = os.path.join(dir_path, f'{i}') - _create_file_with_size(file_path, size) - - -def _overwrite_dir_full_access(directory): - if os.path.exists(directory): - shutil.rmtree(directory) - os.makedirs(directory, 0o777) - - -def _reset_dir(directory): - """ - Deletes everything in the given folder, and recreates it anew. - """ - shutil.rmtree(directory, ignore_errors=True) - _overwrite_dir_full_access(directory) - - -def _get_default_env(config_file): - return { - 'AWS_CONFIG_FILE': config_file, - 'AWS_DEFAULT_REGION': 'us-west-2', - 'AWS_ACCESS_KEY_ID': 'access_key', - 'AWS_SECRET_ACCESS_KEY': 'secret_key' - } - - -def _default_config_file_contents(): - return ( - '[default]' - ) - - -class RawResponse(BytesIO): - """ - A bytes-like streamable HTTP response representation. - """ - def stream(self, **kwargs): - contents = self.read() - while contents: - yield contents - contents = self.read() - - -class StubbedHTTPClient(object): - """ - A generic stubbed HTTP client. - """ - def setup(self): - urllib3_session_send = 'botocore.httpsession.URLLib3Session.send' - self._urllib3_patch = mock.patch(urllib3_session_send) - self._send = self._urllib3_patch.start() - self._send.side_effect = self.get_response - self._responses = [] - - def tearDown(self): - self._urllib3_patch.stop() - - def get_response(self, request): - response = self._responses.pop(0) - if isinstance(response, Exception): - raise response - return response - - def add_response(self, body, headers, status_code): - response = awscli.botocore.awsrequest.AWSResponse( - url='http://169.254.169.254/', - status_code=status_code, - headers=headers, - raw=RawResponse(body.encode()) - ) - self._responses.append(response) - - -class ProcessBenchmarker(object): - """ - Periodically samples CPU and memory usage of a process given its pid. Writes - all collected samples to a CSV file. - """ - def benchmark_process(self, pid, output_file, data_interval): - parent_pid = os.getpid() - try: - # Benchmark the process where the script is being run. - self._run_benchmark(pid, output_file, data_interval) - except KeyboardInterrupt: - # If there is an interrupt, then try to clean everything up. - proc = psutil.Process(parent_pid) - procs = proc.children(recursive=True) - - for child in procs: - child.terminate() - - gone, alive = psutil.wait_procs(procs, timeout=1) - for child in alive: - child.kill() - raise - - - def _run_benchmark(self, pid, output_file, data_interval): - process_to_measure = psutil.Process(pid) - output_f = open(output_file, 'w') - - while process_to_measure.is_running(): - if process_to_measure.status() == psutil.STATUS_ZOMBIE: - process_to_measure.kill() - break - time.sleep(data_interval) - try: - # Collect the memory and cpu usage. - memory_used = process_to_measure.memory_info().rss - cpu_percent = process_to_measure.cpu_percent() - except (psutil.AccessDenied, psutil.ZombieProcess): - # Trying to get process information from a closed or - # zombie process will result in corresponding exceptions. - break - - # Determine the lapsed time for bookkeeping - current_time = time.time() - - # Save all the data into a CSV file. - output_f.write( - f"{current_time},{memory_used},{cpu_percent}\n" - ) - output_f.flush() - - -class BenchmarkHarness(object): - """ - Orchestrates running benchmarks in isolated, configurable environments defined - via a specified JSON file. - """ - def __init__(self): - self._summarizer = Summarizer() - def _setup_environment(self, env, result_dir, config_file): - """ - Creates all files / directories defined in the env struct. - Also, writes a config file named 'config' to the result directory - with contents optionally specified by the env struct. - """ - if "files" in env: - for file_def in env['files']: - path = os.path.join(result_dir, file_def['name']) - _create_file_with_size(path, file_def['size']) - if "file_dirs" in env: - for file_dir_def in env['file_dirs']: - dir_path = os.path.join(result_dir, file_dir_def['name']) - _create_file_dir( - dir_path, - file_dir_def['file_count'], - file_dir_def['file_size'] - ) - with open(config_file, 'w') as f: - f.write(env.get('config', _default_config_file_contents())) - f.flush() - - - def _setup_iteration( - self, - benchmark, - client, - result_dir, - performance_dir, - config_file - ): - """ - Performs the setup for a single iteration of a benchmark. This - includes creating the files used by a command and stubbing - the HTTP client to use during execution. - """ - env = benchmark.get('environment', {}) - self._setup_environment(env, result_dir, config_file) - _overwrite_dir_full_access(performance_dir) - client.setup() - self._stub_responses( - benchmark.get('responses', [{"headers": {}, "body": ""}]), - client - ) - - - def _stub_responses(self, responses, client): - """ - Stubs the supplied HTTP client using the response instructions in the supplied - responses struct. Each instruction will generate one or more stubbed responses. - """ - for response in responses: - body = response.get("body", "") - headers = response.get("headers", {}) - status_code = response.get("status_code", 200) - # use the instances key to support duplicating responses a configured number of times - if "instances" in response: - for _ in range(int(response['instances'])): - client.add_response(body, headers, status_code) - else: - client.add_response(body, headers, status_code) - - - def _run_isolated_benchmark( - self, - result_dir, - performance_dir, - benchmark, - client, - process_benchmarker, - args - ): - """ - Runs a single iteration of one benchmark execution. Includes setting up - the environment, running the benchmarked execution, formatting - the results, and cleaning up the environment. - """ - out_file = os.path.join(performance_dir, 'performance.csv') - assets_dir = os.path.join(result_dir, 'assets') - config_file = os.path.join(assets_dir, 'config') - # setup for iteration of benchmark - self._setup_iteration(benchmark, client, result_dir, performance_dir, config_file) - os.chdir(result_dir) - # patch the OS environment with our supplied defaults - env_patch = mock.patch.dict('os.environ', _get_default_env(config_file)) - env_patch.start() - # fork a child process to run the command on. - # the parent process benchmarks the child process until the child terminates. - pid = os.fork() - - try: - # execute command on child process - if pid == 0: - # TODO refactor to helper function - first_client_invocation_time = None - start_time = time.time() - driver = create_clidriver() - event_emitter = driver.session.get_component('event_emitter') - def _log_invocation_time(params, request_signer, model, **kwargs): - nonlocal first_client_invocation_time - if first_client_invocation_time is None: - first_client_invocation_time = time.time() - - event_emitter.register_last( - 'before-call', - _log_invocation_time, - 'benchmarks.log-invocation-time' - ) - AWSCLIEntryPoint(driver).main(benchmark['command']) - end_time = time.time() - - # write the collected metrics to a file - metrics_f = open(os.path.join(result_dir, 'metrics.json'), 'w') - metrics_f.write(json.dumps( - { - 'start_time': start_time, - 'end_time': end_time, - 'first_client_invocation_time': first_client_invocation_time - } - )) - metrics_f.close() - os._exit(0) - # benchmark child process from parent process until child terminates - process_benchmarker.benchmark_process( - pid, - out_file, - args.data_interval - ) - # summarize benchmark results & process summary - summary = self._summarizer.summarize(out_file) - # load the internally-collected metrics and append to the summary - metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) - summary['total_time'] = metrics_f['end_time'] - metrics_f['start_time'] - summary['first_client_invocation_time'] = (metrics_f['first_client_invocation_time'] - - metrics_f['start_time']) - finally: - # cleanup iteration of benchmark - client.tearDown() - _reset_dir(result_dir) - _reset_dir(assets_dir) - env_patch.stop() - self._time_of_call = None - return summary - - - def run_benchmarks(self, args): - """ - Orchestrates benchmarking via the benchmark definitions in - the arguments. - """ - summaries = {'results': []} - result_dir = args.result_dir - assets_dir = os.path.join(result_dir, 'assets') - performance_dir = os.path.join(result_dir, 'performance') - client = StubbedHTTPClient() - process_benchmarker = ProcessBenchmarker() - definitions = json.load(open(args.benchmark_definitions, 'r')) - _overwrite_dir_full_access(result_dir) - _overwrite_dir_full_access(assets_dir) - - try: - for benchmark in definitions: - benchmark_result = { - 'name': benchmark['name'], - 'dimensions': benchmark['dimensions'], - 'measurements': [] - } - for _ in range(args.num_iterations): - measurements = self._run_isolated_benchmark( - result_dir, - performance_dir, - benchmark, - client, - process_benchmarker, - args - ) - benchmark_result['measurements'].append(measurements) - summaries['results'].append(benchmark_result) - finally: - # final cleanup - shutil.rmtree(result_dir, ignore_errors=True) - print(json.dumps(summaries, indent=2)) - - if __name__ == "__main__": harness = BenchmarkHarness() parser = argparse.ArgumentParser() From 723e0a04ed720684fb5a0705992a4a816a1df79f Mon Sep 17 00:00:00 2001 From: aemous Date: Wed, 29 Jan 2025 09:25:56 -0500 Subject: [PATCH 8/9] refactor and cleanup. --- scripts/performance/benchmark_utils.py | 407 ++++--------------------- scripts/performance/run-benchmarks | 1 - 2 files changed, 66 insertions(+), 342 deletions(-) diff --git a/scripts/performance/benchmark_utils.py b/scripts/performance/benchmark_utils.py index d5f70e1c72bf..bf4b53e6ddf9 100644 --- a/scripts/performance/benchmark_utils.py +++ b/scripts/performance/benchmark_utils.py @@ -1,16 +1,10 @@ -import csv import json import math import time import psutil -import s3transfer import os -import subprocess -import uuid import shutil -import argparse -import tempfile from awscli.botocore.awsrequest import AWSResponse from unittest import mock @@ -22,93 +16,78 @@ class Summarizer: DATA_INDEX_IN_ROW = {'time': 0, 'memory': 1, 'cpu': 2} def __init__(self): - self._num_rows = 0 self._start_time = None self._end_time = None - self._averages = { + self._samples = [] + self._sums = { 'memory': 0.0, 'cpu': 0.0, } - self._samples = { - 'memory': [], - 'cpu': [], - } - self._maximums = {'memory': 0.0, 'cpu': 0.0} - - def summarize(self, benchmark_file): - """Processes the data from the CSV file""" - with open(benchmark_file) as f: - reader = csv.reader(f) - # Process each row from the CSV file - row = None - for row in reader: - self._validate_row(row, benchmark_file) - self.process_data_row(row) - self._validate_row(row, benchmark_file) - self._end_time = self._get_time(row) - metrics = self._finalize_processed_data_for_file() + + def summarize(self, samples): + """Processes benchmark data from a dictionary.""" + self._samples = samples + self._validate_samples(samples) + for idx, sample in enumerate(samples): + # If the sample is the first one, collect the start time. + if idx == 0: + self._start_time = self._get_time(sample) + self.process_data_sample(sample) + self._end_time = self._get_time(samples[-1]) + metrics = self._finalize_processed_data_for_file(samples) return metrics - def _validate_row(self, row, filename): - if not row: + def _validate_samples(self, samples): + if not samples: raise RuntimeError( - f'Row: {row} could not be processed. The CSV file ({filename}) may be ' - 'empty.' + 'Benchmark samples could not be processed. ' + 'The samples list is empty' ) - def process_data_row(self, row): - # If the row is the first row collect the start time. - if self._num_rows == 0: - self._start_time = self._get_time(row) - self._num_rows += 1 - self.process_data_point(row, 'memory') - self.process_data_point(row, 'cpu') - - def process_data_point(self, row, name): - # Determine where in the CSV row the requested data is located. - index = self.DATA_INDEX_IN_ROW[name] - # Get the data point. - data_point = float(row[index]) - self._add_to_average(name, data_point) - self._account_for_maximum(name, data_point) - self._samples[name].append(data_point) - - def _finalize_processed_data_for_file(self): - self._samples['memory'].sort() - self._samples['cpu'].sort() + def process_data_sample(self, sample): + self._add_to_sums('memory', sample['memory']) + self._add_to_sums('cpu', sample['cpu']) + + def _finalize_processed_data_for_file(self, samples): + self._samples.sort(key=self._get_memory) + memory_p50 = self._compute_metric_percentile(50, 'memory') + memory_p95 = self._compute_metric_percentile(95, 'memory') + self._samples.sort(key=self._get_cpu) + cpu_p50 = self._compute_metric_percentile(50, 'cpu') + cpu_p95 = self._compute_metric_percentile(95, 'cpu') + max_memory = max(samples, key=self._get_memory)['memory'] + max_cpu = max(samples, key=self._get_cpu)['cpu'] metrics = { 'time': self._end_time - self._start_time, - 'average_memory': self._averages['memory'] / self._num_rows, - 'average_cpu': self._averages['cpu'] / self._num_rows, - 'max_memory': self._maximums['memory'], - 'max_cpu': self._maximums['cpu'], - 'memory_p50': self._compute_metric_percentile(50, 'memory'), - 'memory_p95': self._compute_metric_percentile(95, 'memory'), - 'cpu_p50': self._compute_metric_percentile(50, 'cpu'), - 'cpu_p95': self._compute_metric_percentile(95, 'cpu'), + 'average_memory': self._sums['memory'] / len(samples), + 'average_cpu': self._sums['cpu'] / len(samples), + 'max_memory': max_memory, + 'max_cpu': max_cpu, + 'memory_p50': memory_p50, + 'memory_p95': memory_p95, + 'cpu_p50': cpu_p50, + 'cpu_p95': cpu_p95, } - # Reset some of the data needed to be tracked for each execution - self._num_rows = 0 - self._maximums = self._maximums.fromkeys(self._maximums, 0.0) - self._averages = self._averages.fromkeys(self._averages, 0.0) - self._samples['memory'].clear() - self._samples['cpu'].clear() + # Reset samples after we're done with it + self._samples.clear() return metrics def _compute_metric_percentile(self, percentile, name): - num_samples = len(self._samples[name]) + num_samples = len(self._samples) p_idx = math.ceil(percentile*num_samples/100) - 1 - return self._samples[name][p_idx] + return self._samples[p_idx][name] - def _get_time(self, row): - return float(row[self.DATA_INDEX_IN_ROW['time']]) + def _get_time(self, sample): + return sample['time'] - def _add_to_average(self, name, data_point): - self._averages[name] += data_point + def _get_memory(self, sample): + return sample['memory'] - def _account_for_maximum(self, name, data_point): - if data_point > self._maximums[name]: - self._maximums[name] = data_point + def _get_cpu(self, sample): + return sample['cpu'] + + def _add_to_sums(self, name, data_point): + self._sums[name] += data_point class RawResponse(BytesIO): @@ -154,14 +133,13 @@ def add_response(self, body, headers, status_code): class ProcessBenchmarker(object): """ - Periodically samples CPU and memory usage of a process given its pid. Writes - all collected samples to a CSV file. + Periodically samples CPU and memory usage of a process given its pid. """ - def benchmark_process(self, pid, output_file, data_interval): + def benchmark_process(self, pid, data_interval): parent_pid = os.getpid() try: # Benchmark the process where the script is being run. - self._run_benchmark(pid, output_file, data_interval) + return self._run_benchmark(pid, data_interval) except KeyboardInterrupt: # If there is an interrupt, then try to clean everything up. proc = psutil.Process(parent_pid) @@ -175,10 +153,9 @@ def benchmark_process(self, pid, output_file, data_interval): child.kill() raise - - def _run_benchmark(self, pid, output_file, data_interval): + def _run_benchmark(self, pid, data_interval): process_to_measure = psutil.Process(pid) - output_f = open(output_file, 'w') + samples = [] while process_to_measure.is_running(): if process_to_measure.status() == psutil.STATUS_ZOMBIE: @@ -193,18 +170,17 @@ def _run_benchmark(self, pid, output_file, data_interval): # Trying to get process information from a closed or # zombie process will result in corresponding exceptions. break - # Determine the lapsed time for bookkeeping current_time = time.time() - - # Save all the data into a CSV file. - output_f.write( - f"{current_time},{memory_used},{cpu_percent}\n" - ) - output_f.flush() + samples.append({ + "time": current_time, "memory": memory_used, "cpu": cpu_percent + }) + return samples class BenchmarkHarness(object): + _DEFAULT_FILE_CONFIG_CONTENTS = "[default]" + """ Orchestrates running benchmarks in isolated, configurable environments defined via a specified JSON file. @@ -220,11 +196,6 @@ def _get_default_env(self, config_file): 'AWS_SECRET_ACCESS_KEY': 'secret_key' } - def _default_config_file_contents(self): - return ( - '[default]' - ) - def _create_file_with_size(self, path, size): """ Creates a full-access file in the given directory with the @@ -266,7 +237,7 @@ def _setup_environment(self, env, result_dir, config_file): file_dir_def['file_size'] ) with open(config_file, 'w') as f: - f.write(env.get('config', self._default_config_file_contents())) + f.write(env.get('config', self._DEFAULT_FILE_CONFIG_CONTENTS)) f.flush() def _setup_iteration( @@ -357,7 +328,6 @@ def _run_isolated_benchmark( the environment, running the benchmarked execution, formatting the results, and cleaning up the environment. """ - out_file = os.path.join(performance_dir, 'performance.csv') assets_dir = os.path.join(result_dir, 'assets') config_file = os.path.join(assets_dir, 'config') # setup for iteration of benchmark @@ -376,13 +346,12 @@ def _run_isolated_benchmark( self._run_command_with_metric_hooks(benchmark['command'], result_dir) # benchmark child process from parent process until child terminates - process_benchmarker.benchmark_process( + samples = process_benchmarker.benchmark_process( pid, - out_file, args.data_interval ) # summarize benchmark results and process summary - summary = self._summarizer.summarize(out_file) + summary = self._summarizer.summarize(samples) # load the internally-collected metrics and append to the summary metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) # override the summarizer's sample-based timing with the @@ -443,247 +412,3 @@ def run_benchmarks(self, args): # final cleanup shutil.rmtree(result_dir, ignore_errors=True) print(json.dumps(summaries, indent=2)) - - -def summarize(script, result_dir, summary_dir): - """Run the given summary script on every file in the given directory. - - :param script: A summarization script that takes a list of csv files. - :param result_dir: A directory containing csv performance result files. - :param summary_dir: The directory to put the summary file in. - """ - summarize_args = [script] - for f in os.listdir(result_dir): - path = os.path.join(result_dir, f) - if os.path.isfile(path): - summarize_args.append(path) - - with open(os.path.join(summary_dir, 'summary.txt'), 'wb') as f: - subprocess.check_call(summarize_args, stdout=f) - with open(os.path.join(summary_dir, 'summary.json'), 'wb') as f: - summarize_args.extend(['--output-format', 'json']) - subprocess.check_call(summarize_args, stdout=f) - - -def _get_s3transfer_performance_script(script_name): - """Retrieves an s3transfer performance script if available.""" - s3transfer_directory = os.path.dirname(s3transfer.__file__) - s3transfer_directory = os.path.dirname(s3transfer_directory) - scripts_directory = 'scripts/performance' - scripts_directory = os.path.join(s3transfer_directory, scripts_directory) - script = os.path.join(scripts_directory, script_name) - - if os.path.isfile(script): - return script - else: - return None - - -def get_benchmark_script(): - return _get_s3transfer_performance_script('benchmark') - - -def get_summarize_script(): - return _get_s3transfer_performance_script('summarize') - - -def backup(source, recursive): - """Backup a given source to a temporary location. - - :type source: str - :param source: A local path or s3 path to backup. - - :type recursive: bool - :param recursive: if True, the source will be treated as a directory. - """ - if source[:5] == 's3://': - parts = source.split('/') - parts.insert(3, str(uuid.uuid4())) - backup_path = '/'.join(parts) - else: - name = os.path.split(source)[-1] - temp_dir = tempfile.mkdtemp() - backup_path = os.path.join(temp_dir, name) - - copy(source, backup_path, recursive) - return backup_path - - -def copy(source, destination, recursive): - """Copy files from one location to another. - - The source and destination must both be s3 paths or both be local paths. - - :type source: str - :param source: A local path or s3 path to backup. - - :type destination: str - :param destination: A local path or s3 path to backup the source to. - - :type recursive: bool - :param recursive: if True, the source will be treated as a directory. - """ - if 's3://' in [source[:5], destination[:5]]: - cp_args = ['aws', 's3', 'cp', source, destination, '--quiet'] - if recursive: - cp_args.append('--recursive') - subprocess.check_call(cp_args) - return - - if recursive: - shutil.copytree(source, destination) - else: - shutil.copy(source, destination) - - -def clean(destination, recursive): - """Delete a file or directory either locally or on S3.""" - if destination[:5] == 's3://': - rm_args = ['aws', 's3', 'rm', '--quiet', destination] - if recursive: - rm_args.append('--recursive') - subprocess.check_call(rm_args) - else: - if recursive: - shutil.rmtree(destination) - else: - os.remove(destination) - - -def create_random_subfolder(destination): - """Create a random subdirectory in a given directory.""" - folder_name = str(uuid.uuid4()) - if destination.startswith('s3://'): - parts = destination.split('/') - parts.append(folder_name) - return '/'.join(parts) - else: - parts = list(os.path.split(destination)) - parts.append(folder_name) - path = os.path.join(*parts) - os.makedirs(path) - return path - - -def get_transfer_command(command, recursive, quiet): - """Get a full cli transfer command. - - Performs common transformations, e.g. adding --quiet - """ - cli_command = 'aws s3 ' + command - - if recursive: - cli_command += ' --recursive' - - if quiet: - cli_command += ' --quiet' - else: - print(cli_command) - - return cli_command - - -def benchmark_command(command, benchmark_script, summarize_script, - output_dir, num_iterations, dry_run, upkeep=None, - cleanup=None): - """Benchmark several runs of a long-running command. - - :type command: str - :param command: The full aws cli command to benchmark - - :type benchmark_script: str - :param benchmark_script: A benchmark script that takes a command to run - and outputs performance data to a file. This should be from s3transfer. - - :type summarize_script: str - :param summarize_script: A summarization script that the output of the - benchmark script. This should be from s3transfer. - - :type output_dir: str - :param output_dir: The directory to output performance results to. - - :type num_iterations: int - :param num_iterations: The number of times to run the benchmark on the - command. - - :type dry_run: bool - :param dry_run: Whether or not to actually run the benchmarks. - - :type upkeep: function that takes no arguments - :param upkeep: A function that is run after every iteration of the - benchmark process. This should be used for upkeep, such as restoring - files that were deleted as part of the command executing. - - :type cleanup: function that takes no arguments - :param cleanup: A function that is run at the end of the benchmark - process or if there are any problems during the benchmark process. - It should be uses for the final cleanup, such as deleting files that - were created at some destination. - """ - performance_dir = os.path.join(output_dir, 'performance') - if os.path.exists(performance_dir): - shutil.rmtree(performance_dir) - os.makedirs(performance_dir) - - try: - for i in range(num_iterations): - out_file = 'performance%s.csv' % i - out_file = os.path.join(performance_dir, out_file) - benchmark_args = [ - benchmark_script, command, '--output-file', out_file - ] - if not dry_run: - subprocess.check_call(benchmark_args) - if upkeep is not None: - upkeep() - - if not dry_run: - summarize(summarize_script, performance_dir, output_dir) - finally: - if not dry_run and cleanup is not None: - cleanup() - - -def get_default_argparser(): - """Get an ArgumentParser with all the base benchmark arguments added in.""" - parser = argparse.ArgumentParser() - parser.add_argument( - '--no-cleanup', action='store_true', default=False, - help='Do not remove the destination after the tests complete.' - ) - parser.add_argument( - '--recursive', action='store_true', default=False, - help='Indicates that this is a recursive transfer.' - ) - benchmark_script = get_benchmark_script() - parser.add_argument( - '--benchmark-script', default=benchmark_script, - required=benchmark_script is None, - help=('The benchmark script to run the commands with. This should be ' - 'from s3transfer.') - ) - summarize_script = get_summarize_script() - parser.add_argument( - '--summarize-script', default=summarize_script, - required=summarize_script is None, - help=('The summarize script to run the commands with. This should be ' - 'from s3transfer.') - ) - parser.add_argument( - '-o', '--result-dir', default='results', - help='The directory to output performance results to. Existing ' - 'results will be deleted.' - ) - parser.add_argument( - '--dry-run', default=False, action='store_true', - help='If set, commands will only be printed out, not executed.' - ) - parser.add_argument( - '--quiet', default=False, action='store_true', - help='If set, output is suppressed.' - ) - parser.add_argument( - '-n', '--num-iterations', default=1, type=int, - help='The number of times to run the test.' - ) - return parser \ No newline at end of file diff --git a/scripts/performance/run-benchmarks b/scripts/performance/run-benchmarks index 1adcb422ba33..bce6f614929f 100755 --- a/scripts/performance/run-benchmarks +++ b/scripts/performance/run-benchmarks @@ -9,7 +9,6 @@ _BENCHMARK_DEFINITIONS = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'benchmarks.json' ) -_DEFAULT_FILE_CONFIG_CONTENTS = "[default]" if __name__ == "__main__": From bacd93f1d6e1a1acd43739cafc89fd4ef3fbf31a Mon Sep 17 00:00:00 2001 From: aemous Date: Wed, 29 Jan 2025 10:29:11 -0500 Subject: [PATCH 9/9] Remove unused code. Misc. refactoring. --- scripts/performance/benchmark_utils.py | 68 ++++++++++---------------- 1 file changed, 25 insertions(+), 43 deletions(-) diff --git a/scripts/performance/benchmark_utils.py b/scripts/performance/benchmark_utils.py index bf4b53e6ddf9..f9b69d8ade0c 100644 --- a/scripts/performance/benchmark_utils.py +++ b/scripts/performance/benchmark_utils.py @@ -49,6 +49,7 @@ def process_data_sample(self, sample): self._add_to_sums('cpu', sample['cpu']) def _finalize_processed_data_for_file(self, samples): + # compute percentiles self._samples.sort(key=self._get_memory) memory_p50 = self._compute_metric_percentile(50, 'memory') memory_p95 = self._compute_metric_percentile(95, 'memory') @@ -57,8 +58,8 @@ def _finalize_processed_data_for_file(self, samples): cpu_p95 = self._compute_metric_percentile(95, 'cpu') max_memory = max(samples, key=self._get_memory)['memory'] max_cpu = max(samples, key=self._get_cpu)['cpu'] + # format computed statistics metrics = { - 'time': self._end_time - self._start_time, 'average_memory': self._sums['memory'] / len(samples), 'average_cpu': self._sums['cpu'] / len(samples), 'max_memory': max_memory, @@ -68,7 +69,7 @@ def _finalize_processed_data_for_file(self, samples): 'cpu_p50': cpu_p50, 'cpu_p95': cpu_p95, } - # Reset samples after we're done with it + # reset samples array self._samples.clear() return metrics @@ -199,7 +200,8 @@ def _get_default_env(self, config_file): def _create_file_with_size(self, path, size): """ Creates a full-access file in the given directory with the - specified name and size. + specified name and size. The created file will be full of + null bytes to achieve the specified size. """ f = open(path, 'wb') os.chmod(path, 0o777) @@ -211,19 +213,28 @@ def _create_file_dir(self, dir_path, file_count, size): """ Creates a directory with the specified name. Also creates identical files with the given size in the created directory. The number of identical files - to be created is specified by file_count. + to be created is specified by file_count. Each file will be full of + null bytes to achieve the specified size. """ os.mkdir(dir_path, 0o777) for i in range(int(file_count)): file_path = os.path.join(dir_path, f'{i}') self._create_file_with_size(file_path, size) - def _setup_environment(self, env, result_dir, config_file): + def _setup_iteration( + self, + benchmark, + client, + result_dir, + config_file + ): """ - Creates all files / directories defined in the env struct. - Also, writes a config file named 'config' to the result directory - with contents optionally specified by the env struct. + Performs the environment setup for a single iteration of a + benchmark. This includes creating the files used by a + command and stubbing the HTTP client to use during execution. """ + # create necessary files for iteration + env = benchmark.get('environment', {}) if "files" in env: for file_def in env['files']: path = os.path.join(result_dir, file_def['name']) @@ -236,33 +247,16 @@ def _setup_environment(self, env, result_dir, config_file): file_dir_def['file_count'], file_dir_def['file_size'] ) + # create config file at specified path with open(config_file, 'w') as f: f.write(env.get('config', self._DEFAULT_FILE_CONFIG_CONTENTS)) f.flush() - - def _setup_iteration( - self, - benchmark, - client, - result_dir, - performance_dir, - config_file - ): - """ - Performs the setup for a single iteration of a benchmark. This - includes creating the files used by a command and stubbing - the HTTP client to use during execution. - """ - env = benchmark.get('environment', {}) - self._setup_environment(env, result_dir, config_file) + # setup and stub HTTP client client.setup() self._stub_responses( benchmark.get('responses', [{"headers": {}, "body": ""}]), client ) - if os.path.exists(performance_dir): - shutil.rmtree(performance_dir) - os.makedirs(performance_dir, 0o777) def _stub_responses(self, responses, client): """ @@ -312,12 +306,12 @@ def _log_invocation_time(params, request_signer, model, **kwargs): } )) metrics_f.close() + # terminate the process os._exit(0) def _run_isolated_benchmark( self, result_dir, - performance_dir, benchmark, client, process_benchmarker, @@ -330,8 +324,9 @@ def _run_isolated_benchmark( """ assets_dir = os.path.join(result_dir, 'assets') config_file = os.path.join(assets_dir, 'config') + os.makedirs(assets_dir, 0o777) # setup for iteration of benchmark - self._setup_iteration(benchmark, client, result_dir, performance_dir, config_file) + self._setup_iteration(benchmark, client, result_dir, config_file) os.chdir(result_dir) # patch the OS environment with our supplied defaults env_patch = mock.patch.dict('os.environ', self._get_default_env(config_file)) @@ -344,7 +339,6 @@ def _run_isolated_benchmark( # execute command on child process if pid == 0: self._run_command_with_metric_hooks(benchmark['command'], result_dir) - # benchmark child process from parent process until child terminates samples = process_benchmarker.benchmark_process( pid, @@ -352,11 +346,8 @@ def _run_isolated_benchmark( ) # summarize benchmark results and process summary summary = self._summarizer.summarize(samples) - # load the internally-collected metrics and append to the summary + # load the child-collected metrics and append to the summary metrics_f = json.load(open(os.path.join(result_dir, 'metrics.json'), 'r')) - # override the summarizer's sample-based timing with the - # wall-clock time measured by the child process - del summary['time'] summary['total_time'] = metrics_f['end_time'] - metrics_f['start_time'] summary['first_client_invocation_time'] = (metrics_f['first_client_invocation_time'] - metrics_f['start_time']) @@ -365,10 +356,7 @@ def _run_isolated_benchmark( client.tearDown() shutil.rmtree(result_dir, ignore_errors=True) os.makedirs(result_dir, 0o777) - shutil.rmtree(assets_dir, ignore_errors=True) - os.makedirs(assets_dir, 0o777) env_patch.stop() - self._time_of_call = None return summary def run_benchmarks(self, args): @@ -378,17 +366,12 @@ def run_benchmarks(self, args): """ summaries = {'results': []} result_dir = args.result_dir - assets_dir = os.path.join(result_dir, 'assets') - performance_dir = os.path.join(result_dir, 'performance') client = StubbedHTTPClient() process_benchmarker = ProcessBenchmarker() definitions = json.load(open(args.benchmark_definitions, 'r')) if os.path.exists(result_dir): shutil.rmtree(result_dir) os.makedirs(result_dir, 0o777) - if os.path.exists(assets_dir): - shutil.rmtree(assets_dir) - os.makedirs(assets_dir, 0o777) try: for benchmark in definitions: @@ -400,7 +383,6 @@ def run_benchmarks(self, args): for _ in range(args.num_iterations): measurements = self._run_isolated_benchmark( result_dir, - performance_dir, benchmark, client, process_benchmarker,