Skip to content

Commit

Permalink
Merge pull request #34 from pinecone-io/reduce_memory_usage
Browse files Browse the repository at this point in the history
Reduce dataset memory footprint
  • Loading branch information
daverigby authored Feb 19, 2024
2 parents 01592df + 87de5ef commit 881f974
Showing 1 changed file with 46 additions and 6 deletions.
52 changes: 46 additions & 6 deletions locustfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def _(parser):
" list full details of available datasets.")
pc_options.add_argument("--pinecone-dataset-ignore-queries", action=argparse.BooleanOptionalAction,
help="Ignore and do not load the 'queries' table from the specified dataset.")
pc_options.add_argument("--pinecone-dataset-docs-sample-for-query", type=float, default=0.01,
metavar="<fraction> (0.0 - 1.0)",
help="Specify the fraction of docs which should be sampled when the documents vectorset "
"is used for queries (default: %(default)s).")
pc_options.add_argument("--pinecone-populate-index", choices=["always", "never", "if-count-mismatch"],
default="if-count-mismatch",
help="Should the index be populated with the dataset before issuing requests. Choices: "
Expand Down Expand Up @@ -132,16 +136,23 @@ def setup_dataset(environment: Environment, skip_download_and_populate: bool = F
print()
sys.exit(1)

logging.info(f"Loading Dataset {dataset_name} into memory for Worker...")
logging.info(f"Loading Dataset {dataset_name} into memory for Worker {os.getpid()}...")
environment.dataset = Dataset(dataset_name, environment.parsed_options.pinecone_dataset_cache)
ignore_queries = environment.parsed_options.pinecone_dataset_ignore_queries
environment.dataset.load(skip_download=skip_download_and_populate, load_queries=not ignore_queries)
sample_ratio = environment.parsed_options.pinecone_dataset_docs_sample_for_query
environment.dataset.load(skip_download=skip_download_and_populate,
load_queries=not ignore_queries,
doc_sample_fraction=sample_ratio)
populate = environment.parsed_options.pinecone_populate_index
if not skip_download_and_populate and populate != "never":
logging.info(
f"Populating index {environment.host} with {len(environment.dataset.documents)} vectors from dataset '{dataset_name}'")
environment.dataset.upsert_into_index(environment.host,
skip_if_count_identical=(populate == "if-count-mismatch"))
# We no longer need documents - if we were populating then that is
# finished, and if not populating then we don't need documents for
# anything else.
environment.dataset.prune_documents()


@events.test_start.add_listener
Expand Down Expand Up @@ -249,7 +260,8 @@ def list():
datasets.append(json.loads(m.download_as_string()))
return datasets

def load(self, skip_download: bool = False, load_queries: bool = True):
def load(self, skip_download: bool = False, load_queries: bool = True,
doc_sample_fraction: float = 1.0):
"""
Load the dataset, populating the 'documents' and 'queries' DataFrames.
"""
Expand Down Expand Up @@ -277,6 +289,11 @@ def load(self, skip_download: bool = False, load_queries: bool = True):
self.queries = self.documents[["values"]].copy()
self.queries.rename(columns={"values": "vector"}, inplace=True)

# Use a sampling of documents for queries (to avoid
# keeping a large complete dataset in memory for each
# worker process).
self.queries = self.queries.sample(frac=doc_sample_fraction, random_state=1)

def upsert_into_index(self, index_host, skip_if_count_identical: bool = False):
"""
Upsert the datasets' documents into the specified index.
Expand All @@ -298,6 +315,14 @@ def upsert_into_index(self, index_host, skip_if_count_identical: bool = False):
f"Not all records upserted successfully. Dataset count:{len(self.documents)},"
f" upserted count:{upserted_count}")

def prune_documents(self):
"""
Discard the contents of self.documents once it is no longer required
(it can consume a significant amount of memory).
"""
del self.documents
logging.debug(f"After pruning, 'queries' memory usage:{self.queries.memory_usage()}")

def _download_dataset_files(self):
self.cache.mkdir(parents=True, exist_ok=True)
logging.debug(f"Checking for existence of dataset '{self.name}' in dataset cache '{self.cache}'")
Expand Down Expand Up @@ -351,7 +376,13 @@ def _load_parquet_dataset(self, kind):
metadata_column = "filter"
else:
raise ValueError(f"Unsupported kind '{kind}' - must be one of (documents, queries)")
df = dataset.read(columns=columns).to_pandas()
# Note: We to specify pandas.ArrowDtype as the types mapper to use pyarrow datatypes in the
# resulting DataFrame. This is significant as (for reasons unknown) it allows subsequent
# samples() of the DataFrame to be "disconnected" from the original underlying pyarrow data,
# and hence significantly reduces memory usage when we later prune away the underlying
# parrow data (see prune_documents).
df = dataset.read(columns=columns).to_pandas(types_mapper=pandas.ArrowDtype)

# And drop any columns which all values are missing - e.g. not all
# datasets have sparse_values, but the parquet file may still have
# the (empty) column present.
Expand Down Expand Up @@ -410,7 +441,16 @@ def split_dataframe(df, batch_size):
pbar = tqdm(desc="Populating index", unit=" vectors", total=len(self.documents))
upserted_count = 0
for sub_frame in split_dataframe(self.documents, 10000):
resp = index.upsert_from_dataframe(sub_frame,
# The 'values' column in the DataFrame is a pyarrow type (list<item: double>[pyarrow])
# as it was read using the pandas.ArrowDtype types_mapper (see _load_parquet_dataset).
# This _can_ be automatically converted to a Python list object inside upsert_from_dataframe,
# but it is slow, as at that level the DataFrame is iterated row-by-row and the conversion
# happens one element at a time.
# However, converting the entire sub-frame's column back to a Python object before calling
# upsert_from_dataframe() is significantly faster, such that the overall upsert throughput
# (including the actual server-side work) is around 2x greater if we pre-convert.
converted = sub_frame.astype(dtype={'values': object})
resp = index.upsert_from_dataframe(converted,
batch_size=200,
show_progress=False)
upserted_count += resp.upserted_count
Expand Down Expand Up @@ -495,7 +535,7 @@ def _query_vector(self):
"""
if not self.environment.dataset.queries.empty:
record = self.environment.dataset.queries.sample(n=1).iloc[0]
return record['vector'].tolist()
return record['vector']
return ((np.random.random_sample(self.dimensions) * 2.0) - 1.0).tolist()


Expand Down

0 comments on commit 881f974

Please sign in to comment.