diff --git a/locustfile.py b/locustfile.py index 2197225..38d4795 100644 --- a/locustfile.py +++ b/locustfile.py @@ -75,7 +75,23 @@ def _(parser): @events.init.add_listener -def check_for_dataset(environment: Environment, **kwargs): +def on_locust_init(environment, **_kwargs): + if not isinstance(environment.runner, WorkerRunner): + # For Worker runners dataset setup is deferred until the test starts, + # to avoid multiple processes trying to downlaod at the same time. + setup_dataset(environment) + + +def setup_dataset(environment: Environment, skip_download_and_populate: bool = False): + """ + Sets up the dataset specified via the --pinecone_dataset argument: + - downloads it if not already present in local cache + - reads 'documents' data (if needed for population) + - reads 'queries' data if present. + - populates the index with the documents data (if requested). + The Dataset is assigned to `environment.dataset` for later use by Users + making requests. + """ dataset_name = environment.parsed_options.pinecone_dataset if not dataset_name: environment.dataset = Dataset() @@ -93,15 +109,25 @@ def check_for_dataset(environment: Environment, **kwargs): sys.exit(1) environment.dataset = Dataset(dataset_name, environment.parsed_options.pinecone_dataset_cache) - environment.dataset.load() + environment.dataset.load(skip_download=skip_download_and_populate) populate = environment.parsed_options.pinecone_populate_index - if populate != "never": + if not skip_download_and_populate and populate != "never": logging.info( f"Populating index {environment.host} with {len(environment.dataset.documents)} vectors from dataset '{dataset_name}'") environment.dataset.upsert_into_index(environment.host, skip_if_count_identical=(populate == "if-count-mismatch")) +@events.test_start.add_listener +def setup_worker_dataset(environment, **_kwargs): + # happens only once in headless runs, but can happen multiple times in web ui-runs + # in a distributed run, the master does not typically need any test data + if isinstance(environment.runner, WorkerRunner): + # Make the Dataset available for WorkerRunners (non-Worker will have + # already setup the dataset via on_locust_init). + setup_dataset(environment, skip_download_and_populate=True) + + @events.test_start.add_listener def set_up_iteration_limit(environment: Environment, **kwargs): options = environment.parsed_options @@ -168,11 +194,12 @@ def list(): datasets.append(json.loads(m.download_as_string())) return datasets - def load(self): + def load(self, skip_download: bool = False): """ Load the dataset, populating the 'documents' and 'queries' DataFrames. """ - self._download_dataset_files() + if not skip_download: + self._download_dataset_files() # Load all the parquet dataset (made up of one or more parquet files), # to use for documents into a pandas dataframe. diff --git a/tests/integration/test_requests.py b/tests/integration/test_requests.py index 71971d9..0921c99 100644 --- a/tests/integration/test_requests.py +++ b/tests/integration/test_requests.py @@ -51,13 +51,24 @@ def do_request(index_host, mode, tag, expected_name, timeout=4, extra_args=[]): extra_args=["--tags", tag] + extra_args) # Check that stderr contains the expected output, and no errors. assert '"Traceback' not in stderr - assert 'All users spawned: {"PineconeUser": 1}' in stderr # Check stats output shows expected requests occurred and they # succeeded. - stats = json.loads(stdout)[0] - assert expected_name in stats['name'] - assert stats['num_requests'] == 1 - assert stats['num_failures'] == 0 + # With multiple processes (--processes) we see one JSON array for + # each process, so must handle multiple JSON objects. + stats = [] + while stdout: + try: + stats.append(json.loads(stdout)[0]) + break + except json.JSONDecodeError as e: + stdout = stdout[e.pos:] + for s in stats: + # Ignore empty stat sets (e.g. the master runner emits empty + # stats) + if s: + assert expected_name in s['name'] + assert s['num_requests'] == 1 + assert s['num_failures'] == 0 assert proc.returncode == 0 @@ -81,7 +92,20 @@ def test_dataset_load(self, index_host): # has a non-zero queries set. test_dataset = "ANN_MNIST_d784_euclidean" self.do_request(index_host, "sdk", 'query', 'Vector (Query only)', - timeout=60, extra_args=["--pinecone-dataset", test_dataset]) + timeout=60, + extra_args=["--pinecone-dataset", test_dataset, + "--pinecone-populate-index", "always"]) + + + def test_dataset_load_multiprocess(self, index_host): + # Choosing a small dataset ("only" 60,000 documents) which also + # has a non-zero queries set. + test_dataset = "ANN_MNIST_d784_euclidean" + self.do_request(index_host, "sdk", 'query', 'Vector (Query only)', + timeout=60, + extra_args=["--pinecone-dataset", test_dataset, + "--pinecone-populate-index", "always", + "--processes", "1"]) @pytest.mark.parametrize("mode", ["rest", "sdk", "sdk+grpc"]) @@ -89,6 +113,9 @@ class TestPineconeModes(TestPineconeBase): def test_pinecone_query(self, index_host, mode): self.do_request(index_host, mode, 'query', 'Vector (Query only)') + def test_pinecone_query_multiprocess(self, index_host, mode): + self.do_request(index_host, mode, 'query', 'Vector (Query only)', timeout=20, extra_args=["--processes=1"]) + def test_pinecone_query_meta(self, index_host, mode): self.do_request(index_host, mode, 'query_meta', 'Vector + Metadata')