Skip to content

Commit

Permalink
Rebase Renaming Components: Rename parameters for Distributed Workloa…
Browse files Browse the repository at this point in the history
…d Generation - Issue 258 (#… (opensearch-project#414)

Signed-off-by: vivek palakkat <[email protected]>
Co-authored-by: dosa_chammandi <[email protected]>
Co-authored-by: Maddox Schmidlkofer <[email protected]>
  • Loading branch information
3 people authored and Ian Hoang committed Jul 23, 2024
1 parent 43d43b2 commit 51d0322
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
6 changes: 3 additions & 3 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def add_workload_source(subparser):
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later
test_execution_parser.add_argument(
"--load-worker-coordinator-hosts",
"--worker-ips",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_execution_parser.add_argument(
Expand Down Expand Up @@ -904,8 +904,8 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
"worker_ips",
opts.csv_to_list(args.worker_ips))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
Expand Down
10 changes: 5 additions & 5 deletions osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ def __init__(self, target, config, os_client_factory_class=client.OsClientFactor
self.workload = None
self.test_procedure = None
self.metrics_store = None
self.load_worker_coordinator_hosts = []
self.worker_ips = []
self.workers = []
# which client ids are assigned to which workers?
self.clients_per_worker = {}
Expand Down Expand Up @@ -637,7 +637,7 @@ def prepare_benchmark(self, t):
# are not useful and attempts to connect to a non-existing cluster just lead to exception traces in logs.
self.prepare_telemetry(os_clients, enable=not uses_static_responses)

for host in self.config.opts("worker_coordinator", "load_worker_coordinator_hosts"):
for host in self.config.opts("worker_coordinator", "worker_ips"):
host_config = {
# for simplicity we assume that all benchmark machines have the same specs
"cores": num_cores(self.config)
Expand All @@ -647,9 +647,9 @@ def prepare_benchmark(self, t):
else:
host_config["host"] = host

self.load_worker_coordinator_hosts.append(host_config)
self.worker_ips.append(host_config)

self.target.prepare_workload([h["host"] for h in self.load_worker_coordinator_hosts], self.config, self.workload)
self.target.prepare_workload([h["host"] for h in self.worker_ips], self.config, self.workload)

def start_benchmark(self):
self.logger.info("Benchmark is about to start.")
Expand All @@ -670,7 +670,7 @@ def start_benchmark(self):
if allocator.clients < 128:
self.logger.info("Allocation matrix:\n%s", "\n".join([str(a) for a in self.allocations]))

worker_assignments = calculate_worker_assignments(self.load_worker_coordinator_hosts, allocator.clients)
worker_assignments = calculate_worker_assignments(self.worker_ips, allocator.clients)
worker_id = 0
for assignment in worker_assignments:
host = assignment["host"]
Expand Down
15 changes: 10 additions & 5 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3130,7 +3130,7 @@ class PutPipelineRunnerTests(TestCase):
async def test_create_pipeline(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename",
Expand Down Expand Up @@ -3158,13 +3158,16 @@ async def test_create_pipeline(self, opensearch, on_client_request_start, on_cli
async def test_param_body_mandatory(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"id": "rename"
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'body'. "
"Parameter source "
"for operation 'put-pipeline' "
"did not provide the "
"mandatory parameter 'body'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand All @@ -3177,13 +3180,15 @@ async def test_param_body_mandatory(self, opensearch, on_client_request_start, o
async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_client_request_end):
opensearch.ingest.put_pipeline.return_value = as_future()

r = runner.PutPipeline()
r = runner.CreateIngestPipeline()

params = {
"body": {}
}
with self.assertRaisesRegex(exceptions.DataError,
"Parameter source for operation 'put-pipeline' did not provide the mandatory parameter 'id'. "
"Parameter source for "
"operation 'put-pipeline' did"
" not provide the mandatory parameter 'id'. "
"Add it to your parameter source and try again."):
await r(opensearch, params)

Expand Down
4 changes: 2 additions & 2 deletions tests/worker_coordinator/worker_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def setUp(self):
self.cfg.add(config.Scope.application, "client", "hosts",
WorkerCoordinatorTests.Holder(all_hosts={"default": ["localhost:9200"]}))
self.cfg.add(config.Scope.application, "client", "options", WorkerCoordinatorTests.Holder(all_client_options={"default": {}}))
self.cfg.add(config.Scope.application, "worker_coordinator", "load_worker_coordinator_hosts", ["localhost"])
self.cfg.add(config.Scope.application, "worker_coordinator", "worker_ips", ["localhost"])
self.cfg.add(config.Scope.application, "results_publishing", "datastore.type", "in-memory")

default_test_procedure = workload.TestProcedure("default", default=True, schedule=[
Expand All @@ -135,7 +135,7 @@ def create_test_worker_coordinator_target(self):
@mock.patch("osbenchmark.utils.net.resolve")
def test_start_benchmark_and_prepare_workload(self, resolve):
# override load worker_coordinator host
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "load_worker_coordinator_hosts", ["10.5.5.1", "10.5.5.2"])
self.cfg.add(config.Scope.applicationOverride, "worker_coordinator", "worker_ips", ["10.5.5.1", "10.5.5.2"])
resolve.side_effect = ["10.5.5.1", "10.5.5.2"]

target = self.create_test_worker_coordinator_target()
Expand Down

0 comments on commit 51d0322

Please sign in to comment.