Skip to content

Commit

Permalink
Use positional arguments when calling kll update (#1545)
Browse files Browse the repository at this point in the history
## Description
Fixes for long running instances of rolling logger:

* Use positional arguments when calling kll update
* Add a load test to rolling logger
* usage stats update to add segment_on_column, update
condition_validator

```
poetry run pytest -o log_level=INFO -o log_cli=true tests/api/logger/test_rolling.py::test_rolling_logger_load_test --load
```

- [x] I have reviewed the [Guidelines for Contributing](CONTRIBUTING.md)
and the [Code of Conduct](CODE_OF_CONDUCT.md).
  • Loading branch information
jamie256 authored Jul 18, 2024
1 parent b6a6dae commit 13e089c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
47 changes: 47 additions & 0 deletions python/tests/api/logger/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import math
import os
from datetime import datetime, timezone
from logging import getLogger
from os import listdir
from os.path import isfile
from typing import Any, Optional, Tuple
Expand All @@ -22,6 +23,8 @@
from whylogs.core.schema import DatasetSchema
from whylogs.core.segmentation_partition import segment_on_column

TEST_LOGGER = getLogger(__name__)


class TimerContext:
def __enter__(self) -> "TimerContext":
Expand Down Expand Up @@ -265,6 +268,50 @@ def test_rolling_row_messages_with_segments(tmp_path: Any) -> None:
assert rolling_callback.call_count == 2


@pytest.mark.load
def test_rolling_logger_load_test(tmp_path: Any) -> None:
import gc
import tracemalloc

num_messages = 10
messages = [{"col1": i % 2, "col2": i * i * 1.2, "col3": "a"} for i in range(num_messages)]
tracemalloc.start()
# Here we create an aggressively rolling logger to try and test for memory pressure related to
# long running rolling logger instances over time. Don't do this in actual integrations.
rolling_logger = why.logger(
mode="rolling",
interval=1,
when="S",
base_name="test_base_name",
)
rolling_logger.append_writer("local", base_dir=tmp_path)
# parameterize the load test, 10,000 iterations with 10 messages per loop -> 100k log calls
test_iterations = 10000

def loop_test(rolling_logger, messages, test_iterations):
for i in range(test_iterations):
if i % 1000 == 0:
print(f"Iteration {i} out of {test_iterations}")
for message in messages:
rolling_logger.log(message)

loop_test(rolling_logger, messages, test_iterations)
gc.collect()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")

# Before the memory fix, this would show hundreds of megabytes surviving garbage collection
# the expected results here are under a few megabytes for any of the top lines.
# TODO: use an assert to catch catastrophic regressions.
TEST_LOGGER.info("Top memory-consuming lines:")
for stat in top_stats[:5]:
TEST_LOGGER.info(stat)

TEST_LOGGER.info(f"load test with {test_iterations} iterations each using {num_messages}")

rolling_logger.close()


def test_rolling_do_rollover():
import pandas as pd

Expand Down
4 changes: 2 additions & 2 deletions python/whylogs/core/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def columnar_update(self, view: PreprocessedColumn) -> OperationResult:
if view.numpy.len > 0:
for arr in [view.numpy.floats, view.numpy.ints]:
if arr is not None:
self.kll.value.update(arr)
self.kll.value.update(array=arr)
n_b = len(arr)
if n_b > 1:
n_b = len(arr)
Expand All @@ -281,7 +281,7 @@ def columnar_update(self, view: PreprocessedColumn) -> OperationResult:

for lst in [view.list.ints, view.list.floats]:
if lst is not None and len(lst) > 0:
self.kll.value.update_list(lst)
self.kll.value.update_list(num_items=lst)
n_b = len(lst)
if n_b > 1:
mean_b = statistics.mean(lst)
Expand Down
3 changes: 3 additions & 0 deletions python/whylogs/core/segmentation_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dataclasses import dataclass, field
from typing import Callable, List, Mapping, Optional

from whylogs.api.usage_stats import emit_usage
from whylogs.core.projectors import FieldProjector

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,4 +70,6 @@ def __hash__(self):


def segment_on_column(column_name: str) -> Mapping[str, SegmentationPartition]:
emit_usage("segment_on_column")

return {column_name: SegmentationPartition(name=column_name, mapper=ColumnMapperFunction(col_names=[column_name]))}
2 changes: 1 addition & 1 deletion python/whylogs/core/validators/condition_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ConditionValidator(Validator):
def __post_init__(self):
from whylogs.api.usage_stats import emit_usage

emit_usage("condition_validators")
emit_usage("condition_validator")
for cond_name in self.conditions.keys():
if cond_name not in self.failures:
self.failures[cond_name] = 0
Expand Down

0 comments on commit 13e089c

Please sign in to comment.