Skip to content

Commit

Permalink
Merge branch 'dev_2.0.0' into TH2-5202_fix_cache_file_removal_bug
Browse files Browse the repository at this point in the history
  • Loading branch information
davitmamrikishvili authored May 20, 2024
2 parents 30db269 + 3bc6991 commit 60046de
Show file tree
Hide file tree
Showing 20 changed files with 1,929 additions and 261 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config-windows.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ repos:
always_run: true
# Don't know how to solve the libs problem other way.
additional_dependencies: [pytest, treelib, flatdict,
orjson, th2-data-services-lwdp==2.0.3.0]
orjson, th2-data-services-lwdp==2.0.3.0, ciso8601]

- id: check_copyright
name: check_copyright
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ repos:
always_run: true
# Don't know how to solve the libs problem other way.
additional_dependencies: [pytest, treelib, flatdict,
orjson, th2-data-services-lwdp==2.0.3.0]
orjson, th2-data-services-lwdp==2.0.3.0, ciso8601]

- id: check_copyright
name: check_copyright
Expand Down
65 changes: 45 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ This example shows basic usage of library's features.
[The following example as a file](examples/get_started_example.py).
<!-- start get_started_example.py -->
```python
from typing import Tuple, List, Optional, Generator
from datetime import datetime
Expand Down Expand Up @@ -215,23 +216,27 @@ events = Data(
######################################
# [1.1] Filter.
filtered_events: Data = events.filter(lambda e: e["body"] != []) # Filter events with empty body.
filtered_events: Data = events.filter(
lambda e: e["body"] != []) # Filter events with empty body.
# [1.2] Map.
def transform_function(record):
return {"eventName": record["eventName"], "successful": record["successful"]}
return {"eventName": record["eventName"],
"successful": record["successful"]}
filtered_and_mapped_events = filtered_events.map(transform_function)
# [1.3] Data pipeline.
# Instead of doing data transformations step by step you can do it in one line.
filtered_and_mapped_events_by_pipeline = events.filter(lambda e: e["body"] != []).map(
filtered_and_mapped_events_by_pipeline = events.filter(
lambda e: e["body"] != []).map(
transform_function
)
# Content of these two Data objects should be equal.
assert list(filtered_and_mapped_events) == list(filtered_and_mapped_events_by_pipeline)
assert list(filtered_and_mapped_events) == list(
filtered_and_mapped_events_by_pipeline)
# [1.4] Sift. Skip the first few items or limit them.
data = Data([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15])
Expand Down Expand Up @@ -269,9 +274,11 @@ events_filtered: Data = events.filter(lambda record: record.get("batchId"))
# After that, the Data object will create its own cache file.
events_filtered.use_cache()
list(events_filtered) # Just to iterate Data object (cache file will be created).
list(
events_filtered) # Just to iterate Data object (cache file will be created).
filtered_events_types = events_filtered.map(lambda record: {"eventType": record.get("eventType")})
filtered_events_types = events_filtered.map(
lambda record: {"eventType": record.get("eventType")})
events_without_types_with_batch = filtered_events_types.filter(
lambda record: not record.get("eventType")
Expand Down Expand Up @@ -338,7 +345,8 @@ data_obj_from_cache.clear_cache()
# [2.1] DatetimeConverter.
# DatetimeConverter takes datetime.datetime object as input.
datetime_obj = datetime(year=2023, month=1, day=5, hour=14, minute=38, second=25, microsecond=1460)
datetime_obj = datetime(year=2023, month=1, day=5, hour=14, minute=38,
second=25, microsecond=1460)
# It has methods that return the datetime in different formas:
Expand All @@ -365,10 +373,14 @@ datetime_from_string = DatetimeStringConverter.to_datetime(date_string)
protobuf_timestamp = {"epochSecond": 1672929505, "nano": 1_460_000}
date_ms_from_timestamp = ProtobufTimestampConverter.to_milliseconds(protobuf_timestamp)
date_us_from_timestamp = ProtobufTimestampConverter.to_microseconds(protobuf_timestamp)
date_ns_from_timestamp = ProtobufTimestampConverter.to_nanoseconds(protobuf_timestamp)
datetime_from_timestamp = ProtobufTimestampConverter.to_datetime(protobuf_timestamp)
date_ms_from_timestamp = ProtobufTimestampConverter.to_milliseconds(
protobuf_timestamp)
date_us_from_timestamp = ProtobufTimestampConverter.to_microseconds(
protobuf_timestamp)
date_ns_from_timestamp = ProtobufTimestampConverter.to_nanoseconds(
protobuf_timestamp)
datetime_from_timestamp = ProtobufTimestampConverter.to_datetime(
protobuf_timestamp)
######################################
# [3] Working with EventTree and EventTreeCollection.
Expand All @@ -378,17 +390,21 @@ datetime_from_timestamp = ProtobufTimestampConverter.to_datetime(protobuf_timest
# [3.1] Build a custom EventTree
# To create an EventTree object you need to provide name, id and data of the root event.
tree = EventTree(event_name="root event", event_id="root_id", data={"data": [1, 2, 3, 4, 5]})
tree = EventTree(event_name="root event", event_id="root_id",
data={"data": [1, 2, 3, 4, 5]})
# To add new node use append_event. parent_id is necessary, data is optional.
tree.append_event(event_name="A", event_id="A_id", data=None, parent_id="root_id")
tree.append_event(event_name="A", event_id="A_id", data=None,
parent_id="root_id")
# [3.3] Building the EventTreeCollection.
data_source: IDataSource # You should init DataSource object. E.g. from LwDP module.
data_source = DummyDataSource() # Note! We use fake DS here.
# ETCDriver here is a stub, actually the lib doesn't have such a class.
# You can take it in LwDP module or create yourself class if you have some special events structure.
from th2_data_services.data_source.lwdp.event_tree import HttpETCDriver as ETCDriver
from th2_data_services.data_source.lwdp.event_tree import
HttpETCDriver as ETCDriver
# If you don't specify data_source for the driver then it won't recover detached events.
driver: IETCDriver # You should init ETCDriver object. E.g. from LwDP module or your custom class.
Expand Down Expand Up @@ -429,7 +445,8 @@ roots: List[str] = etc.get_roots_ids()
# ['demo_book_1:th2-scope:20230105135705560873000:d61e930a-8d00-11ed-aa1a-d34a6155152d_1']
# [3.3.3] Find an event in all trees.
find_event: Optional[dict] = etc.find(lambda event: "Send message" in event["eventType"])
find_event: Optional[dict] = etc.find(
lambda event: "Send message" in event["eventType"])
# [3.3.4] Find all events in all trees. There is also iterable version 'findall_iter'.
find_events: List[dict] = etc.findall(lambda event: event["successful"] is True)
Expand Down Expand Up @@ -525,7 +542,9 @@ data_source: IDataSource # You should init DataSource object. E.g. from LwDP mo
data_source = DummyDataSource() # Note! We use fake DS here.
# ETCDriver here is a stub, actually the lib doesn't have such a class.
# You can take it in LwDP module or create yourself class if you have some special events structure.
from th2_data_services.data_source.lwdp.event_tree import HttpETCDriver as ETCDriver
from th2_data_services.data_source.lwdp.event_tree import
HttpETCDriver as ETCDriver
driver = ETCDriver(data_source=data_source)
petc = ParentEventTreeCollection(driver)
Expand All @@ -544,13 +563,16 @@ from th2_data_services.data_source import (
lwdp,
) # lwdp data_source initialize th2_data_services.config during import.
from th2_data_services.config import options as o_
from th2_data_services.data_source.lwdp.stub_builder import http_message_stub_builder
from th2_data_services.data_source.lwdp.stub_builder import
http_message_stub_builder
fake_data = [
http_message_stub_builder.build({"messageId": "a", "messageType": "Root"}),
http_message_stub_builder.build({"messageId": "b", "messageType": "New"}),
http_message_stub_builder.build({"messageId": "c", "messageType": "Amend"}),
http_message_stub_builder.build({"messageId": "d", "messageType": "Cancel"}),
http_message_stub_builder.build(
{"messageId": "d", "messageType": "Cancel"}),
]
fake_data_obj = Data(fake_data)
Expand Down Expand Up @@ -581,14 +603,17 @@ for m in fake_data_obj.map(o_.mfr.expand_message):
######################################
# [5] Using utility functions.
######################################
from th2_data_services.utils.event_utils.frequencies import get_category_frequencies2
from th2_data_services.utils.event_utils.frequencies import
get_category_frequencies2
from th2_data_services.utils.event_utils.totals import get_category_totals2
from th2_data_services.utils.category import Category
from th2_data_services.utils.event_utils.event_utils import is_sorted
# [5.1] Get the quantities of events for different categories.
metrics = [
Category("date", lambda m: Th2TimestampConverter.to_datetime(m["startTimestamp"]).date()),
Category("date", lambda m: Th2TimestampConverter.to_datetime(
m["startTimestamp"]).date()),
Category("status", lambda m: m["successful"]),
]
category_totals = get_category_totals2(events, metrics)
Expand Down
14 changes: 6 additions & 8 deletions documentation/api/interfaces.utils.converter.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ If your timestamp has nanoseconds, they will be just cut (not rounded).

- <b>`datetime`</b>: Timestamp in python datetime format.

Speed test: AMD Ryzen 7 6800H with Radeon Graphics 3.20 GHz ~ 987 ns per iteration ~= 1000000 iterations per second

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L144"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L137"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_datetime_str`

Expand Down Expand Up @@ -113,7 +111,7 @@ Format example:

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L116"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L109"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_microseconds`

Expand All @@ -139,7 +137,7 @@ If your timestamp has nanoseconds, they will be just cut (not rounding).

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L101"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L94"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_milliseconds`

Expand All @@ -165,7 +163,7 @@ If your timestamp has nanoseconds, they will be just cut (not rounding).

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L131"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L124"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_nanoseconds`

Expand All @@ -189,7 +187,7 @@ Converts timestamp to nanoseconds.

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L86"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L79"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_seconds`

Expand All @@ -215,7 +213,7 @@ If your timestamp has nanoseconds, they will be just cut (not rounding).

---

<a href="../../th2_data_services/interfaces/utils/converter.py#L162"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>
<a href="../../th2_data_services/interfaces/utils/converter.py#L155"><img align="right" style="float:right;" src="https://img.shields.io/badge/-source-cccccc?style=flat-square"></a>

### <kbd>classmethod</kbd> `to_th2_timestamp`

Expand Down
27 changes: 27 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ If you want to use RDP you have to specify dependency in square brackets `[ ]`
6. [TH2-5190] Fixed Data.to_json
7. [TH2-5193] orjson versions 3.7.0 through 3.9.14 library has vulnerability
https://devhub.checkmarx.com/cve-details/CVE-2024-27454/.
8. [TH2-5201] Fixed DatetimeStringConverter.to_th2_timestamp() bug which occurred for inputs not ending with 'Z'.
## Improvements
Expand All @@ -576,3 +577,29 @@ If you want to use RDP you have to specify dependency in square brackets `[ ]`
ProtobufTimestampConverter functions: to_microseconds, to_milliseconds,
to_nanoseconds.
7. [TH2-5081] `Data.__str__` was changed --> use `Data.show()` instead of `print(data)`
8. [TH2-5201] Performance improvements have been made to converters:
Benchmark.
- 1mln iterations per test
- input: 2022-03-05T23:56:44.123456789Z
| Converter | Method | Before (seconds) | After (seconds) | Improvement (rate) |
|----------------------------------|------------------|------------------|-----------------|--------------------|
| DatetimeStringConverter | parse_timestamp | 7.1721964 | 1.4974268 | x4.78 |
| | to_datetime | 8.9945099 | 0.1266325 | x71.02 |
| | to_seconds | 8.6180093 | 1.5360991 | x5.62 |
| | to_microseconds | 7.9066440 | 1.7628856 | x4.48 |
| | to_nanoseconds | 7.6787507 | 1.7114960 | x4.48 |
| | to_milliseconds | 7.6059985 | 1.7688387 | x4.29 |
| | to_datetime_str | 8.3861742 | 2.3781561 | x3.52 |
| | to_th2_timestamp | 7.7702033 | 1.5942235 | x4.87 |
| UniversalDatetimeStringConverter | parse_timestamp | 7.4161371 | 1.5752227 | x4.7 |
| | to_datetime | 8.2108218 | 0.1267797 | x64.76 |
| | to_seconds | 7.7745484 | 1.6453126 | x4.72 |
| | to_microseconds | 7.7569293 | 1.8240784 | x4.25 |
| | to_nanoseconds | 7.7879700 | 1.7930200 | x4.34 |
| | to_milliseconds | 7.8168710 | 1.8308856 | x4.26 |
| | to_datetime_str | 8.7388529 | 2.4592992 | x3.55 |
| | to_th2_timestamp | 7.8972679 | 1.6856898 | x4.68 |
Other converters also have some not big speed improvements.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ciso8601
deprecated
flatdict~=4.0
# orjson versions 3.7.0 through 3.9.14 library has
Expand Down
61 changes: 8 additions & 53 deletions tests/tests_unit/test_data/test_performance/test_many_files.py
Original file line number Diff line number Diff line change
@@ -1,64 +1,19 @@
import os
import timeit
from pathlib import Path
from typing import Iterable, Any

from tests.tests_unit.test_data.test_performance.util import (
Multiply,
reads_all_json_files_from_the_folder,
data_template,
)

# from profilehooks import profile

from th2_data_services.data import Data
from th2_data_services.interfaces import IStreamAdapter


class Multiply(IStreamAdapter):
def __init__(self, multiplier):
self.multiplier = multiplier

def handle(self, stream: Iterable) -> Any:
m: dict
for m in stream:
for x in range(self.multiplier):
new = m.copy()
new["eventId"] += str(x)
new["eventName"] += str(x)
new["eventName"] += str(x)
yield new


TEST_FILES_PATH = "perf_files"


def reads_all_Pickle_files_from_the_folder(path, return_list=False) -> Data:
# TODO -- add files templates e.g. only *.py files
datas = []
for (dirpath, dirnames, filenames) in os.walk(path):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
datas.append(Data.from_json(file_path, gzip=True, buffer_limit=250))

if return_list:
return datas
if len(datas) == 1:
return datas[0]
else:
return Data(datas)


data_template = {
"batchId": None,
"eventId": "84db48fc-d1b4-11eb-b0fb-199708acc7bc",
"eventName": "some event name that will have +1 every new one",
"eventType": "Test event",
"isBatched": False,
"parentEventId": None,
"body": {
"a": 1,
"b": [1, 2, 3],
"c": {"x": 1, "y": 2},
"d": 4,
},
}


def create_files():
"""Creates 100 json-lines files"""
files_path = Path(TEST_FILES_PATH).resolve().absolute()
Expand Down Expand Up @@ -91,8 +46,8 @@ def many_files():
# create_1_file()
pass

data_many_files = reads_all_Pickle_files_from_the_folder(TEST_FILES_PATH)
data_many_files_datas_list = reads_all_Pickle_files_from_the_folder(
data_many_files = reads_all_json_files_from_the_folder(TEST_FILES_PATH)
data_many_files_datas_list = reads_all_json_files_from_the_folder(
TEST_FILES_PATH, return_list=True
)
print("test")
Expand Down
Loading

0 comments on commit 60046de

Please sign in to comment.