Skip to content

Commit

Permalink
add benchmarks and refactor per changes to schema etc
Browse files Browse the repository at this point in the history
  • Loading branch information
mattcember committed Jan 21, 2025
1 parent 77dc1ec commit 7ad1d1a
Show file tree
Hide file tree
Showing 16 changed files with 247 additions and 118 deletions.
25 changes: 17 additions & 8 deletions deltacat/benchmarking/benchmark_engine.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import sys
import time
from contextlib import contextmanager
from typing import Generator, Tuple, Iterator
from typing import Generator, Tuple

from deltacat.benchmarking.benchmark_report import BenchmarkMetric, BenchmarkStep
from deltacat.storage.rivulet.dataset import Dataset
from deltacat.storage.rivulet.reader.query_expression import QueryExpression



@contextmanager
def timed_step(description: str) -> Generator[BenchmarkStep, None, None]:
"""Convenience for computing elapsed time of a block of code as a metric.
Expand All @@ -24,19 +23,20 @@ def timed_step(description: str) -> Generator[BenchmarkStep, None, None]:


class BenchmarkEngine:

def __init__(self, dataset: Dataset):
self.dataset = dataset

def load_and_commit(self, field_group, generator, count) -> Tuple[str, BenchmarkStep]:
def load_and_commit(
self, schema_name, generator, count
) -> Tuple[str, BenchmarkStep]:
"""Load count number of rows from the generator and commit.
:param generator: row generator
:param count: the number of rows to load into the dataset
:return: tuple of the manifest URI and a operation measurement
"""
desc = f"load {count} from {generator}"
writer = self.dataset.writer(field_group)
writer = self.dataset.writer(schema_name)
with timed_step(desc) as step:
rows = [generator.generate() for _ in range(count)]
writer.write(rows)
Expand All @@ -45,21 +45,30 @@ def load_and_commit(self, field_group, generator, count) -> Tuple[str, Benchmark
return result, step

def scan(self) -> Tuple[set[any], BenchmarkStep]:
"""Scans the rows and prints some basic statistics about the manifest"""
"""
Scans the rows in dataset and prints some basic statistics about the manifest
:return: Tuple[set[any], BenchmarkStep] - a tuple containing a set of merge keys and a benchmark step with metrics
"""
keys = set()
object_count = 0
size_b = 0
# Note that we expect single col merge keys so we can return key set
# this will fail with validation error if dataset has multiple merge keys
merge_key_name = self.dataset.schemas["all"].get_merge_key()
with timed_step("full scan") as step:
for row in self.dataset.scan(QueryExpression()).to_pydict():
object_count += 1
size_b += sum([sys.getsizeof(x) for x in row.values()])
keys.add(row.get(self.dataset.schema.primary_key.name))
keys.add(row.get(merge_key_name))
# TODO replace with the actual metrics we want to measure
step.add(BenchmarkMetric("rows read", object_count))
step.add(BenchmarkMetric("size", size_b / (1024 * 1024), "MB"))
return keys, step

def run_queries(self, description, manifest_uri, queries: list[QueryExpression]) -> BenchmarkStep:
def run_queries(
self, description, manifest_uri, queries: list[QueryExpression]
) -> BenchmarkStep:
object_count = 0
size_b = 0
with timed_step(description) as step:
Expand Down
14 changes: 12 additions & 2 deletions deltacat/benchmarking/benchmark_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,22 @@ def __str__(self):
return
suite = self.runs[0].suite

headers = [f"{suite} Operation", "Metric", "Unit", *[r.description for r in self.runs]]
headers = [
f"{suite} Operation",
"Metric",
"Unit",
*[r.description for r in self.runs],
]
rows = []
for step_tranche in zip(*[r.steps for r in self.runs]):
# TODO zip by metric name instead of assuming all metrics are being measured
step_name = step_tranche[0].description
for metric_tuple in zip(*[x.list_metrics() for x in step_tranche]):
row = [step_name, metric_tuple[0].name, metric_tuple[0].unit, *[p.value for p in metric_tuple]]
row = [
step_name,
metric_tuple[0].name,
metric_tuple[0].unit,
*[p.value for p in metric_tuple],
]
rows.append(row)
return tabulate(rows, headers=headers, tablefmt="fancy_outline")
12 changes: 8 additions & 4 deletions deltacat/benchmarking/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
)


@pytest.fixture(autouse=True, scope='function')
@pytest.fixture(autouse=True, scope="function")
def report(request):
report = BenchmarkReport(request.node.name)

def final_callback():
terminal_reporter: TerminalReporter = request.config.pluginmanager.get_plugin("terminalreporter")
capture_manager = request.config.pluginmanager.get_plugin('capturemanager')
terminal_reporter: TerminalReporter = request.config.pluginmanager.get_plugin(
"terminalreporter"
)
capture_manager = request.config.pluginmanager.get_plugin("capturemanager")
with capture_manager.global_and_fixture_disabled():
terminal_reporter.ensure_newline()
terminal_reporter.section(request.node.name, sep='-', blue=True, bold=True)
terminal_reporter.section(request.node.name, sep="-", blue=True, bold=True)
terminal_reporter.write(str(report))
terminal_reporter.ensure_newline()

request.addfinalizer(final_callback)
return report

Expand Down
30 changes: 18 additions & 12 deletions deltacat/benchmarking/data/random_row_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class ImageStyle(Enum):
class RandomRowGenerator(RowGenerator):
"""Generate rows with 'images' that are just randomly-generated bytes"""

def __init__(self, seed=0, tmp_dir=None, style: ImageStyle = ImageStyle.RANDOM_BYTES):
def __init__(
self, seed=0, tmp_dir=None, style: ImageStyle = ImageStyle.RANDOM_BYTES
):
self.seed = seed
self.fake = faker.Faker()
self.fake.seed_instance(seed)
Expand All @@ -51,17 +53,19 @@ def _generate_image(self, width, height) -> bytes:
@staticmethod
def _generate_with_random_bytes(width, height) -> bytes:
"""Generate random bytes to simulate an image."""
target_size = math.floor(width * height / 50) # this isn't actually how file size relates to image size
target_size = math.floor(
width * height / 50
) # this isn't actually how file size relates to image size
# Assumption: we don't actually need images. It suffices to generate arbitrary-length bytes of random characters.
return os.urandom(target_size)

@staticmethod
def _generate_with_pillow(width, height) -> bytes:
"""Generate actual PNG files in-memory directly using Pillow"""
file = BytesIO()
image = Image.new('RGBA', size=(width, height), color=(155, 0, 0))
image.save(file, 'png')
file.name = 'test.png'
image = Image.new("RGBA", size=(width, height), color=(155, 0, 0))
image.save(file, "png")
file.name = "test.png"
file.seek(0)
return file.read()

Expand All @@ -72,17 +76,19 @@ def _generate_with_faker(self, width, height) -> bytes:
root_path=self.temp_dir,
rel_path="tmp",
),
size=(width, height))
size=(width, height),
)
file_name = f"{self.temp_dir}/{rel_name}"
with open(file_name, 'rb') as f:
with open(file_name, "rb") as f:
return f.read()


def generate(self) -> Dict[str,Any]:
def generate(self) -> Dict[str, Any]:
return {
"id": self.fake.random_int(0, 10_000_000),
"source": self.fake.image_url(),
"media": (self._generate_image(
self.fake.random_int(512, 2048),
self.fake.random_int(512, 4096)))
"media": (
self._generate_image(
self.fake.random_int(512, 2048), self.fake.random_int(512, 4096)
)
),
}
1 change: 1 addition & 0 deletions deltacat/benchmarking/data/row_generator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Protocol, Iterator, Dict, Any


class RowGenerator(Protocol):
def generate(self) -> Dict[str, Any]:
...
Expand Down
69 changes: 47 additions & 22 deletions deltacat/benchmarking/test_benchmark_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,30 @@
import tempfile
from contextlib import contextmanager
from random import shuffle

import pytest

from deltacat.storage.rivulet.field_group import FileSystemFieldGroup, FieldGroup

pytestmark = pytest.mark.benchmark

#from rivulet import Datatype, Dataset
from deltacat.storage.rivulet.dataset import Dataset
from deltacat.storage.rivulet.schema.datatype import Datatype
from deltacat.storage.rivulet.fs.file_store import FileStore
from deltacat.storage.rivulet.reader.query_expression import QueryExpression
from deltacat.storage.rivulet.schema.schema import Schema, Field
from deltacat.storage.rivulet.schema.schema import Schema
from deltacat.benchmarking.benchmark_engine import BenchmarkEngine
from deltacat.benchmarking.benchmark_report import BenchmarkRun, BenchmarkReport
from deltacat.benchmarking.benchmark_suite import BenchmarkSuite
from deltacat.benchmarking.data.random_row_generator import RandomRowGenerator
from deltacat.benchmarking.data.row_generator import RowGenerator

pytestmark = pytest.mark.benchmark


@pytest.fixture
def schema():
return Schema({
"id": Field("id", Datatype.int32()),
"source": Field("source", Datatype.string()),
"media": Field("media", Datatype.image("png"))}, "id")
return Schema(
[
("id", Datatype.int32()),
("source", Datatype.string()),
("media", Datatype.image("png")),
],
"id",
)


@contextmanager
Expand All @@ -44,40 +42,52 @@ def make_tmpdir():
class LoadAndScanSuite(BenchmarkSuite):
"""Load some number of rows and scan"""

schema_name = "LoadAndScanSuite"

def __init__(self, dataset: Dataset, schema: Schema, generator, description=None):
self.suite = "ReadSuite"
self.dataset: Dataset = dataset
self.field_group = self.dataset.new_field_group(schema)
self.schema = schema
self.dataset.add_schema(schema, LoadAndScanSuite.schema_name)
self.generator: RowGenerator = generator
self.description: str = description or f"{self.dataset} x {self.generator}"

def run(self) -> BenchmarkRun:
container = BenchmarkEngine(self.dataset)
run = BenchmarkRun(self.suite, self.description)
# load a large number of rows
manifest_uri, step = container.load_and_commit(self.field_group, self.generator, 1000)
manifest_uri, step = container.load_and_commit(
LoadAndScanSuite.schema_name, self.generator, 1000
)
run.add(step)
# do a full scan of all rows (and eagerly load them)
keys, step = container.scan()
run.add(step)
# randomly retrieve all keys one-by-one from the dataset
random_keys = list(keys)
shuffle(random_keys)
step = container.run_queries("load all keys individually", manifest_uri,
[QueryExpression().with_primary_key(k) for k in random_keys])
step = container.run_queries(
"load all keys individually",
manifest_uri,
[QueryExpression().with_key(k) for k in random_keys],
)
run.add(step)
# split into 4 key ranges and get them individually
quartiles = self._generate_quartiles(keys)
expressions = [QueryExpression().with_primary_range(start, end) for (start, end) in quartiles]
step = container.run_queries("load key ranges by quartile", manifest_uri, expressions)
expressions = [
QueryExpression().with_range(start, end) for (start, end) in quartiles
]
step = container.run_queries(
"load key ranges by quartile", manifest_uri, expressions
)
run.add(step)
return run

@staticmethod
def _generate_quartiles(keys):
sorted_keys = sorted(keys)
size = len(keys)
starts = list(range(0, size, math.ceil(size/4)))
starts = list(range(0, size, math.ceil(size / 4)))
ends = list([x - 1 for x in starts[1:]])
ends.append(size - 1)
quartiles = list(zip(starts, ends))
Expand All @@ -87,7 +97,22 @@ def _generate_quartiles(keys):
def test_suite1(schema: Schema, report: BenchmarkReport):
with make_tmpdir() as temp_dir:
generator = RandomRowGenerator(123, temp_dir)
report.add(LoadAndScanSuite(Dataset(temp_dir), schema, generator, "SST (rand)").run())
report.add(
LoadAndScanSuite(
Dataset(dataset_name="test_suite1_ds1", metadata_uri=temp_dir),
schema,
generator,
"SST (rand)",
).run()
)

with make_tmpdir() as temp_dir:
generator = RandomRowGenerator(123, temp_dir)
report.add(LoadAndScanSuite(Dataset(temp_dir), schema, generator, "dupe").run())
report.add(
LoadAndScanSuite(
Dataset(dataset_name="test_suite1_ds2", metadata_uri=temp_dir),
schema,
generator,
"dupe",
).run()
)
4 changes: 2 additions & 2 deletions deltacat/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ a **Name Resolution Directory** to map the object's mutable name or alias back t

**Name Mapping File**

The format of the **Name Mapping File** file is:
The format of the **Name Mapping File** file is:
`<revision_number_padded_20_digits>_<txn_operation_type>_<txn_id>.<object_id>`
Where `object_id` is the name of the associated object's **Immutable ID** directory.
Where `object_id` is the name of the associated object's **Immutable ID** directory.
Note that (except **Immutable ID**) this is the same format used by **Metadata Revision Files**, and the same process is employed to `create`, `update`, and `delete` name mappings.

### Transaction Log Directory
Expand Down
Loading

0 comments on commit 7ad1d1a

Please sign in to comment.