Skip to content

Commit

Permalink
fix: improvements to verify_no_overlap, added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikBjare committed May 22, 2023
1 parent 79d8498 commit fb2fa45
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 141 deletions.
34 changes: 19 additions & 15 deletions aw_research/classify.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
import typing
import logging
from typing import List, Dict, Optional, Tuple, Set

import argparse
import re
import json
from urllib.parse import urlparse
import logging
import re
import typing
from collections import Counter
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import (
Dict,
List,
Optional,
Set,
Tuple,
)
from urllib.parse import urlparse

import toml
import pytz
import pydash
import joblib
import matplotlib.pyplot as plt
import pandas as pd
import joblib

from aw_core.models import Event
from aw_transform import flood, filter_period_intersect, union_no_overlap
import pydash
import pytz
import toml
from aw_client import ActivityWatchClient
from aw_core.models import Event
from aw_transform import filter_period_intersect, flood, union_no_overlap

from .plot_sunburst import sunburst


logger = logging.getLogger(__name__)
memory = joblib.Memory("./.cache/joblib")

Expand Down Expand Up @@ -358,7 +361,8 @@ def _get_events_toggl(since: datetime, filepath: str) -> List[Event]:


def _get_events_smartertime(since: datetime, filepath: str = "auto") -> List[Event]:
# TODO: Use aw_research.importers.smartertime to generate json file if filepath is smartertime export (.csv)
# TODO: Use quantifiedme.load.smartertime to generate json file if filepath is smartertime export (.csv)
# NOTE: deprecated, use methods in quantifiedme instead
if filepath == "auto":
from glob import glob

Expand Down
115 changes: 0 additions & 115 deletions aw_research/importers/smartertime.py

This file was deleted.

98 changes: 87 additions & 11 deletions aw_research/util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from datetime import (
datetime,
time,
timedelta,
timezone,
)
from typing import List, Tuple
from datetime import datetime, time, timedelta, timezone

import pandas as pd

from aw_core import Event


Expand Down Expand Up @@ -119,27 +123,99 @@ def test_split_into_days() -> None:
assert len(split) == 4


def verify_no_overlap(events: List[Event]):
def verify_no_overlap(events: List[Event]) -> None:
events = sorted(events, key=lambda e: e.timestamp)
try:
assert all(
[
e1.timestamp + e1.duration <= e2.timestamp
for e1, e2 in zip(events[:-1], events[1:])
]
)
except AssertionError as e:
n_overlaps = 0
total_overlap = timedelta()
for e1, e2 in zip(events[:-1], events[1:]):
if e1.timestamp + e1.duration > e2.timestamp:
overlap = (e1.timestamp + e1.duration) - e2.timestamp
n_overlaps += 1
total_overlap += overlap
except AssertionError:
n_overlaps, total_overlap = compute_total_overlap(events)
print(
f"[WARNING] Found {n_overlaps} events overlapping, totalling: {total_overlap}"
)


def compute_total_overlap(events: List[Event]) -> Tuple[int, timedelta]:
events = sorted(events, key=lambda e: e.timestamp)
n_overlaps = 0
total_overlap = timedelta()
i, j = 0, 1
assert len(events) > 1
while j < len(events):
e1, e2 = events[i], events[j]
if e1.timestamp + e1.duration > e2.timestamp:
n_overlaps += 1
overlap_start = max(e1.timestamp, e2.timestamp)
overlap_end = min(e1.timestamp + e1.duration, e2.timestamp + e2.duration)
total_overlap += overlap_end - overlap_start
j += 1
print("j+")
else:
if j - i > 1:
# if j isn't directly ahead of i, we can skip ahead
i += 1
print("i+")
else:
# if j is directly ahead of i, we can step both forward
i += 1
j += 1
print("i=j+")
return n_overlaps, total_overlap


def test_compute_total_overlap() -> None:
# Simple test
events = [
Event(
timestamp=datetime(2019, 1, 1, 12, tzinfo=timezone.utc),
duration=timedelta(hours=1),
),
Event(
timestamp=datetime(2019, 1, 1, 12, 30, tzinfo=timezone.utc),
duration=timedelta(hours=1),
),
]
assert compute_total_overlap(events) == (1, timedelta(minutes=30))

# Test with multiple overlaps in sequence after long event
events = [
Event(
timestamp=datetime(2019, 1, 1, 12, tzinfo=timezone.utc),
duration=timedelta(hours=2),
),
Event(
timestamp=datetime(2019, 1, 1, 12, 30, tzinfo=timezone.utc),
duration=timedelta(hours=1),
),
Event(
timestamp=datetime(2019, 1, 1, 13, 30, tzinfo=timezone.utc),
duration=timedelta(hours=1),
),
]
assert compute_total_overlap(events) == (2, timedelta(minutes=90))

# Test with multiple overlaps in sequence after long event, with inter-overlap overlap
events = [
Event(
timestamp=datetime(2019, 1, 1, 12, tzinfo=timezone.utc),
duration=timedelta(hours=2),
),
Event(
timestamp=datetime(2019, 1, 1, 12, 30, tzinfo=timezone.utc),
duration=timedelta(hours=1),
),
Event(
timestamp=datetime(2019, 1, 1, 13, 15, tzinfo=timezone.utc),
duration=timedelta(minutes=15),
),
]
assert compute_total_overlap(events) == (2, timedelta(minutes=75))


# TODO: Write test that ensures timezone localization is handled correctly
def categorytime_per_day(events, category):
events = [e for e in events if category in e.data["$category_hierarchy"]]
Expand Down

0 comments on commit fb2fa45

Please sign in to comment.