Skip to content

Commit

Permalink
Merge pull request #23 from pinecone-io/load_data_multi_process
Browse files Browse the repository at this point in the history
Add support for dataset load with multiple processes
  • Loading branch information
daverigby authored Feb 14, 2024
2 parents 18d5bd3 + 85847c5 commit 0a347f3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 11 deletions.
37 changes: 32 additions & 5 deletions locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,23 @@ def _(parser):


@events.init.add_listener
def check_for_dataset(environment: Environment, **kwargs):
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
# For Worker runners dataset setup is deferred until the test starts,
# to avoid multiple processes trying to downlaod at the same time.
setup_dataset(environment)


def setup_dataset(environment: Environment, skip_download_and_populate: bool = False):
"""
Sets up the dataset specified via the --pinecone_dataset argument:
- downloads it if not already present in local cache
- reads 'documents' data (if needed for population)
- reads 'queries' data if present.
- populates the index with the documents data (if requested).
The Dataset is assigned to `environment.dataset` for later use by Users
making requests.
"""
dataset_name = environment.parsed_options.pinecone_dataset
if not dataset_name:
environment.dataset = Dataset()
Expand All @@ -93,15 +109,25 @@ def check_for_dataset(environment: Environment, **kwargs):
sys.exit(1)

environment.dataset = Dataset(dataset_name, environment.parsed_options.pinecone_dataset_cache)
environment.dataset.load()
environment.dataset.load(skip_download=skip_download_and_populate)
populate = environment.parsed_options.pinecone_populate_index
if populate != "never":
if not skip_download_and_populate and populate != "never":
logging.info(
f"Populating index {environment.host} with {len(environment.dataset.documents)} vectors from dataset '{dataset_name}'")
environment.dataset.upsert_into_index(environment.host,
skip_if_count_identical=(populate == "if-count-mismatch"))


@events.test_start.add_listener
def setup_worker_dataset(environment, **_kwargs):
# happens only once in headless runs, but can happen multiple times in web ui-runs
# in a distributed run, the master does not typically need any test data
if isinstance(environment.runner, WorkerRunner):
# Make the Dataset available for WorkerRunners (non-Worker will have
# already setup the dataset via on_locust_init).
setup_dataset(environment, skip_download_and_populate=True)


@events.test_start.add_listener
def set_up_iteration_limit(environment: Environment, **kwargs):
options = environment.parsed_options
Expand Down Expand Up @@ -168,11 +194,12 @@ def list():
datasets.append(json.loads(m.download_as_string()))
return datasets

def load(self):
def load(self, skip_download: bool = False):
"""
Load the dataset, populating the 'documents' and 'queries' DataFrames.
"""
self._download_dataset_files()
if not skip_download:
self._download_dataset_files()

# Load all the parquet dataset (made up of one or more parquet files),
# to use for documents into a pandas dataframe.
Expand Down
39 changes: 33 additions & 6 deletions tests/integration/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,24 @@ def do_request(index_host, mode, tag, expected_name, timeout=4, extra_args=[]):
extra_args=["--tags", tag] + extra_args)
# Check that stderr contains the expected output, and no errors.
assert '"Traceback' not in stderr
assert 'All users spawned: {"PineconeUser": 1}' in stderr
# Check stats output shows expected requests occurred and they
# succeeded.
stats = json.loads(stdout)[0]
assert expected_name in stats['name']
assert stats['num_requests'] == 1
assert stats['num_failures'] == 0
# With multiple processes (--processes) we see one JSON array for
# each process, so must handle multiple JSON objects.
stats = []
while stdout:
try:
stats.append(json.loads(stdout)[0])
break
except json.JSONDecodeError as e:
stdout = stdout[e.pos:]
for s in stats:
# Ignore empty stat sets (e.g. the master runner emits empty
# stats)
if s:
assert expected_name in s['name']
assert s['num_requests'] == 1
assert s['num_failures'] == 0
assert proc.returncode == 0


Expand All @@ -81,14 +92,30 @@ def test_dataset_load(self, index_host):
# has a non-zero queries set.
test_dataset = "ANN_MNIST_d784_euclidean"
self.do_request(index_host, "sdk", 'query', 'Vector (Query only)',
timeout=60, extra_args=["--pinecone-dataset", test_dataset])
timeout=60,
extra_args=["--pinecone-dataset", test_dataset,
"--pinecone-populate-index", "always"])


def test_dataset_load_multiprocess(self, index_host):
# Choosing a small dataset ("only" 60,000 documents) which also
# has a non-zero queries set.
test_dataset = "ANN_MNIST_d784_euclidean"
self.do_request(index_host, "sdk", 'query', 'Vector (Query only)',
timeout=60,
extra_args=["--pinecone-dataset", test_dataset,
"--pinecone-populate-index", "always",
"--processes", "1"])


@pytest.mark.parametrize("mode", ["rest", "sdk", "sdk+grpc"])
class TestPineconeModes(TestPineconeBase):
def test_pinecone_query(self, index_host, mode):
self.do_request(index_host, mode, 'query', 'Vector (Query only)')

def test_pinecone_query_multiprocess(self, index_host, mode):
self.do_request(index_host, mode, 'query', 'Vector (Query only)', timeout=20, extra_args=["--processes=1"])

def test_pinecone_query_meta(self, index_host, mode):
self.do_request(index_host, mode, 'query_meta', 'Vector + Metadata')

Expand Down

0 comments on commit 0a347f3

Please sign in to comment.