diff --git a/osf/metrics/counted_usage.py b/osf/metrics/counted_usage.py index e6a3abf9cd5..393bd0558c0 100644 --- a/osf/metrics/counted_usage.py +++ b/osf/metrics/counted_usage.py @@ -104,10 +104,10 @@ def _fill_pageview_info(counted_usage): def _fill_osfguid_info(counted_usage, guid_referent): counted_usage.item_public = _get_ispublic(guid_referent) - counted_usage.item_type = type(guid_referent).__name__.lower() + counted_usage.item_type = get_item_type(guid_referent) counted_usage.surrounding_guids = _get_surrounding_guids(guid_referent) if not counted_usage.provider_id: - counted_usage.provider_id = _get_provider_id(guid_referent) + counted_usage.provider_id = get_provider_id(guid_referent) def _fill_document_id(counted_usage): @@ -153,7 +153,7 @@ def _get_ispublic(guid_referent): return getattr(maybe_public, 'is_public', None) # quacks like AbstractNode -def _get_provider_id(guid_referent): +def get_provider_id(guid_referent): provider = getattr(guid_referent, 'provider', None) if isinstance(provider, str): return provider # quacks like BaseFileNode @@ -162,6 +162,10 @@ def _get_provider_id(guid_referent): return 'osf' # quacks like Node, Comment, WikiPage +def get_item_type(guid_referent): + return type(guid_referent).__name__.lower() + + def _get_immediate_wrapper(guid_referent): if hasattr(guid_referent, 'verified_publishable'): return None # quacks like Preprint diff --git a/osf/metrics/reporters/__init__.py b/osf/metrics/reporters/__init__.py index 26738fc0418..41dae7f506b 100644 --- a/osf/metrics/reporters/__init__.py +++ b/osf/metrics/reporters/__init__.py @@ -9,6 +9,7 @@ from .node_count import NodeCountReporter from .osfstorage_file_count import OsfstorageFileCountReporter from .preprint_count import PreprintCountReporter +from .public_item_usage import PublicItemUsageReporter from .user_count import UserCountReporter from .spam_count import SpamCountReporter @@ -28,3 +29,4 @@ class AllDailyReporters(enum.Enum): class AllMonthlyReporters(enum.Enum): SPAM_COUNT = SpamCountReporter INSTITUTIONAL_USERS = InstitutionalUsersReporter + ITEM_USAGE = PublicItemUsageReporter diff --git a/osf/metrics/reporters/public_item_usage.py b/osf/metrics/reporters/public_item_usage.py new file mode 100644 index 00000000000..f2780bb6196 --- /dev/null +++ b/osf/metrics/reporters/public_item_usage.py @@ -0,0 +1,251 @@ +import elasticsearch_dsl as edsl + +from osf.metrics.counted_usage import ( + CountedAuthUsage, + get_item_type, + get_provider_id, +) +from osf.metrics.reports import PublicItemUsageReport +from osf.metrics.utils import YearMonth +from osf import models as osfdb +from website import settings as website_settings +from ._base import MonthlyReporter + + +_CHUNK_SIZE = 500 + + +class _SkipItem(Exception): + pass + + +class PublicItemUsageReporter(MonthlyReporter): + '''build a PublicItemUsageReport for each public item + + includes projects, project components, registrations, registration components, and preprints + ''' + + def report(self, yearmonth: YearMonth): + _exact_and_combined_buckets = _zip_composite_aggs( + self._exact_item_search(yearmonth), 'agg_osfid', + self._contained_item_views_search(yearmonth), 'agg_outer_osfid', + ) + for _exact_bucket, _contained_views_bucket in _exact_and_combined_buckets: + try: + _report = self._report_from_buckets(_exact_bucket, _contained_views_bucket) + _report.view_session_count = self._get_view_session_count(yearmonth, _report.item_osfid) + yield _report + except _SkipItem: + pass + + def _report_from_buckets(self, exact_bucket, contained_views_bucket): + assert (exact_bucket is not None) or (contained_views_bucket is not None) + _report = ( + self._init_report_from_exact_bucket(exact_bucket) + if exact_bucket is not None + else self._init_report_from_osfid(contained_views_bucket.key.osfid) + ) + # view counts include views on contained items (components, files) + if contained_views_bucket is not None: + _report.view_count += contained_views_bucket.doc_count + return _report + + def _init_report_from_exact_bucket(self, exact_bucket) -> PublicItemUsageReport: + # in the (should-be common) case of an item that has been directly viewed in + # this month, the stored metrics already have the data required + _report = PublicItemUsageReport( + item_osfid=exact_bucket.key.osfid, + item_type=_agg_keys(exact_bucket.agg_item_type), + provider_id=_agg_keys(exact_bucket.agg_provider_id), + platform_iri=_agg_keys(exact_bucket.agg_platform_iri), + # default counts to zero, will be updated if non-zero + view_count=0, + view_session_count=0, + download_count=0, + download_session_count=0, + ) + for _actionbucket in exact_bucket.agg_action: + if _actionbucket.key == CountedAuthUsage.ActionLabel.VIEW.value: + _report.view_count = _actionbucket.doc_count + # note: view_session_count computed separately to avoid double-counting + elif _actionbucket.key == CountedAuthUsage.ActionLabel.DOWNLOAD.value: + _report.download_count = _actionbucket.doc_count + _report.download_session_count = _actionbucket.agg_session_count.value + return _report + + def _init_report_from_osfid(self, osfid: str) -> PublicItemUsageReport: + # for the (should-be unusual) case where the components/files contained by + # an item have views in this month, but the item itself does not -- + # load necessary info via django models, instead + _osfguid = osfdb.Guid.load(osfid) + if _osfguid is None: + raise _SkipItem + return PublicItemUsageReport( + item_osfid=osfid, + item_type=[get_item_type(_osfguid.referent)], + provider_id=[get_provider_id(_osfguid.referent)], + platform_iri=[website_settings.DOMAIN], + # default counts to zero, will be updated if non-zero + view_count=0, + view_session_count=0, + download_count=0, + download_session_count=0, + ) + + def _base_usage_search(self, yearmonth): + return ( + CountedAuthUsage.search() + .filter('term', item_public=True) + .filter('range', timestamp={ + 'gte': yearmonth.target_month(), + 'lt': yearmonth.next_month(), + }) + .update_from_dict({'size': 0}) # only aggregations, no hits + ) + + def _exact_item_search(self, yearmonth) -> edsl.Search: + '''aggregate views and downloads on each osfid (not including components/files)''' + _search = self._base_usage_search(yearmonth) + # the main agg: use a composite aggregation to page thru *every* item + _agg_osfid = _search.aggs.bucket( + 'agg_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'item_guid'}}}], + size=_CHUNK_SIZE, + ) + # nested agg: for each item, get platform_iri values + _agg_osfid.bucket('agg_platform_iri', 'terms', field='platform_iri') + # nested agg: for each item, get provider_id values + _agg_osfid.bucket('agg_provider_id', 'terms', field='provider_id') + # nested agg: for each item, get item_type values + _agg_osfid.bucket('agg_item_type', 'terms', field='item_type') + # nested agg: for each item, get view and download count + _agg_action = _agg_osfid.bucket( + 'agg_action', + 'terms', + field='action_labels', + include=[ + CountedAuthUsage.ActionLabel.DOWNLOAD.value, + CountedAuthUsage.ActionLabel.VIEW.value, + ], + ) + # nested nested agg: for each item-action pair, get a session count + _agg_action.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=40000, # maximum precision + ) + return _search + + def _contained_item_views_search(self, yearmonth) -> edsl.Search: + '''aggregate views (but not downloads) on components and files contained within each osfid''' + _search = ( + self._base_usage_search(yearmonth) + .filter('term', action_labels=CountedAuthUsage.ActionLabel.VIEW.value) + ) + # the main agg: use a composite aggregation to page thru *every* item + _search.aggs.bucket( + 'agg_outer_osfid', + 'composite', + sources=[{'osfid': {'terms': {'field': 'surrounding_guids'}}}], + size=_CHUNK_SIZE, + ) + return _search + + def _get_view_session_count(self, yearmonth, osfid: str): + '''compute view_session_count separately to avoid double-counting + + (the same session may be represented in both the composite agg on `item_guid` + and that on `surrounding_guids`) + ''' + _search = ( + self._base_usage_search(yearmonth) + .query( + 'bool', + filter=[ + {'term': {'action_labels': CountedAuthUsage.ActionLabel.VIEW.value}}, + ], + should=[ + {'term': {'item_guid': osfid}}, + {'term': {'surrounding_guids': osfid}}, + ], + minimum_should_match=1, + ) + ) + _search.aggs.metric( + 'agg_session_count', + 'cardinality', + field='session_id', + precision_threshold=40000, # maximum precision + ) + _response = _search.execute() + return _response.aggregations.agg_session_count.value + + +### +# local helpers + +def _agg_keys(bucket_agg_result) -> list: + return [_bucket.key for _bucket in bucket_agg_result] + + +def _zip_composite_aggs( + search_a: edsl.Search, + composite_agg_name_a: str, + search_b: edsl.Search, + composite_agg_name_b: str, +): + '''iterate thru two composite aggregations, yielding pairs of buckets matched by key + + the composite aggregations must have matching names in `sources` + ''' + _iter_a = _iter_composite_buckets(search_a, composite_agg_name_a) + _iter_b = _iter_composite_buckets(search_b, composite_agg_name_b) + _next_a = next(_iter_a, None) + _next_b = next(_iter_b, None) + while True: + if _next_a is None and _next_b is None: + return # both done + elif _next_a is None or _next_b is None: + # one is done but not the other -- no matching needed + yield (_next_a, _next_b) + _next_a = next(_iter_a, None) + _next_b = next(_iter_b, None) + elif _next_a.key == _next_b.key: + # match -- yield and increment both + yield (_next_a, _next_b) + _next_a = next(_iter_a, None) + _next_b = next(_iter_b, None) + elif _orderable_key(_next_a) < _orderable_key(_next_b): + # mismatch -- yield and increment a (but not b) + yield (_next_a, None) + _next_a = next(_iter_a, None) + else: + # mismatch -- yield and increment b (but not a) + yield (None, _next_b) + _next_b = next(_iter_b, None) + + +def _iter_composite_buckets(search: edsl.Search, composite_agg_name: str): + '''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed + + assumes the given search has a composite aggregation of the given name + + updates the search in-place for subsequent pages + ''' + while True: + _page_response = search.execute(ignore_cache=True) + _agg_result = _page_response.aggregations[composite_agg_name] + yield from _agg_result.buckets + # update the search for the next page + try: + _next_after = _agg_result.after_key + except AttributeError: + return # all done + else: + search.aggs[composite_agg_name].after = _next_after + + +def _orderable_key(composite_bucket) -> tuple: + return tuple(sorted(composite_bucket.key.to_dict().items())) diff --git a/osf/metrics/reports.py b/osf/metrics/reports.py index cee4efc7c02..3a5f4b1b449 100644 --- a/osf/metrics/reports.py +++ b/osf/metrics/reports.py @@ -270,3 +270,22 @@ class InstitutionalUserReport(MonthlyReport): published_preprint_count = metrics.Integer() public_file_count = metrics.Long() storage_byte_count = metrics.Long() + + +class PublicItemUsageReport(MonthlyReport): + UNIQUE_TOGETHER_FIELDS = ('report_yearmonth', 'item_osfid') + + # where noted, fields correspond to defined terms from COUNTER + # https://cop5.projectcounter.org/en/5.1/appendices/a-glossary-of-terms.html + item_osfid = metrics.Keyword() # counter:Item + item_type = metrics.Keyword(multi=True) # counter:Data-Type + provider_id = metrics.Keyword(multi=True) # counter:Database(?) + platform_iri = metrics.Keyword(multi=True) # counter:Platform + + # view counts include views on components or files contained by this item + view_count = metrics.Long() # counter:Total_Item_Investigations + view_session_count = metrics.Long() # counter:Unique_Item_Investigations + + # download counts of this item only (not including contained components or files) + download_count = metrics.Long() # counter:Total_Item_Requests + download_session_count = metrics.Long() # counter:Unique_Item_Requests diff --git a/osf_tests/metrics/reporters/test_public_item_usage_reporter.py b/osf_tests/metrics/reporters/test_public_item_usage_reporter.py new file mode 100644 index 00000000000..44948474a7d --- /dev/null +++ b/osf_tests/metrics/reporters/test_public_item_usage_reporter.py @@ -0,0 +1,234 @@ +from datetime import timedelta +from operator import attrgetter +from unittest import mock + +import pytest + +from osf.metrics.counted_usage import CountedAuthUsage +from osf.metrics.reporters.public_item_usage import PublicItemUsageReporter +from osf.metrics.reports import PublicItemUsageReport +from osf.metrics.utils import YearMonth + + +@pytest.mark.es_metrics +class TestPublicItemUsageReporter: + @pytest.fixture(autouse=True) + def _mocks(self): + with ( + # set a tiny page size to force aggregation pagination: + mock.patch('osf.metrics.reporters.public_item_usage._CHUNK_SIZE', 1), + # HACK: skip auto-filling fields from the database: + mock.patch('osf.models.base.Guid.load', return_value=None), + ): + yield + + @pytest.fixture + def ym_empty(self) -> YearMonth: + return YearMonth(2012, 7) + + @pytest.fixture + def ym_sparse(self) -> YearMonth: + return YearMonth(2017, 7) + + @pytest.fixture + def ym_busy(self) -> YearMonth: + return YearMonth(2023, 7) + + @pytest.fixture + def sparse_month_usage(self, ym_sparse): + # "sparse" month: + # item0: 3 views, 0 downloads, 2 sessions + # item1: 1+1 views, 1 download, 2 sessions + # item2: 1 views, 0 downloads, 1 session + _month_start = ym_sparse.target_month() + _save_usage( + timestamp=_month_start, + item_guid='item0', + session_id='sesh0', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start, + item_guid='item1', + session_id='sesh0', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start + timedelta(minutes=2), + item_guid='item0', + session_id='sesh0', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start + timedelta(minutes=3), + item_guid='item1', + session_id='sesh0', + action_labels=['download'], + ) + _save_usage( + timestamp=_month_start + timedelta(days=17), + item_guid='item0', + session_id='sesh1', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start + timedelta(days=17, minutes=5), + item_guid='item2', + surrounding_guids=['item1'], + session_id='sesh1', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start + timedelta(days=17, minutes=11), + item_guid='item2', + surrounding_guids=['item1'], + session_id='sesh1', + action_labels=['download'], + ) + + @pytest.fixture + def busy_month_item0(self, ym_busy): + # item0: 4 sessions, 4*7 views, 4*5 downloads + _month_start = ym_busy.target_month() + for _sesh in range(0, 4): + _sesh_start = _month_start + timedelta(days=_sesh) + for _minute in range(0, 7): + _save_usage( + timestamp=_sesh_start + timedelta(minutes=_minute), + item_guid='item0', + session_id=f'sesh0{_sesh}', + action_labels=['view'], + ) + for _minute in range(10, 15): + _save_usage( + timestamp=_sesh_start + timedelta(minutes=_minute), + item_guid='item0', + session_id=f'sesh0{_sesh}', + action_labels=['download'], + ) + + @pytest.fixture + def busy_month_item1(self, ym_busy): + # item1: 10 sessions, 6*9 views, 5*7 downloads, 2 providers + # (plus 11 views in 11 sessions from child item2) + _month_start = ym_busy.target_month() + for _sesh in range(0, 6): + _sesh_start = _month_start + timedelta(days=_sesh) + for _minute in range(0, 9): + _save_usage( + timestamp=_sesh_start + timedelta(minutes=_minute), + item_guid='item1', + session_id=f'sesh1{_sesh}', + action_labels=['view'], + ) + for _sesh in range(5, 10): + _sesh_start = _month_start + timedelta(days=_sesh) + for _minute in range(10, 17): + _save_usage( + timestamp=_sesh_start + timedelta(minutes=_minute), + item_guid='item1', + session_id=f'sesh1{_sesh}', + action_labels=['download'], + provider_id='prov1', # additional provider_id + ) + + @pytest.fixture + def busy_month_item2(self, ym_busy): + # item2: 11 sessions, 11 views, 11 downloads (child of item1) + _month_start = ym_busy.target_month() + for _sesh in range(1, 12): + _save_usage( + timestamp=_month_start + timedelta(days=_sesh), + item_guid='item2', + surrounding_guids=['item1'], + session_id=f'sesh2{_sesh}', + action_labels=['view'], + ) + _save_usage( + timestamp=_month_start + timedelta(days=_sesh, hours=_sesh), + item_guid='item2', + surrounding_guids=['item1'], + session_id=f'sesh2{_sesh}', + action_labels=['download'], + ) + + def test_reporter(self, ym_empty, ym_sparse, ym_busy, sparse_month_usage, busy_month_item0, busy_month_item1, busy_month_item2): + _reporter = PublicItemUsageReporter() + _empty = list(_reporter.report(ym_empty)) + _sparse = list(_reporter.report(ym_sparse)) + _busy = list(_reporter.report(ym_busy)) + + # empty month: + assert _empty == [] + + # sparse month: + assert len(_sparse) == 3 + _sparse_item0, _sparse_item1, _sparse_item2 = sorted(_sparse, key=attrgetter('item_osfid')) + # sparse-month item0 + assert isinstance(_sparse_item0, PublicItemUsageReport) + assert _sparse_item0.item_osfid == 'item0' + assert _sparse_item0.provider_id == ['prov0'] + assert _sparse_item0.platform_iri == ['http://osf.example'] + assert _sparse_item0.view_count == 3 + assert _sparse_item0.view_session_count == 2 + assert _sparse_item0.download_count == 0 + assert _sparse_item0.download_session_count == 0 + # sparse-month item1 + assert isinstance(_sparse_item1, PublicItemUsageReport) + assert _sparse_item1.item_osfid == 'item1' + assert _sparse_item1.provider_id == ['prov0'] + assert _sparse_item1.platform_iri == ['http://osf.example'] + assert _sparse_item1.view_count == 2 # including item2 + assert _sparse_item1.view_session_count == 2 # including item2 + assert _sparse_item1.download_count == 1 # NOT including item2 + assert _sparse_item1.download_session_count == 1 # NOT including item2 + # sparse-month item2 + assert isinstance(_sparse_item1, PublicItemUsageReport) + assert _sparse_item2.item_osfid == 'item2' + assert _sparse_item2.provider_id == ['prov0'] + assert _sparse_item2.platform_iri == ['http://osf.example'] + assert _sparse_item2.view_count == 1 + assert _sparse_item2.view_session_count == 1 + assert _sparse_item2.download_count == 1 + assert _sparse_item2.download_session_count == 1 + + # busy month: + assert len(_busy) == 3 + _busy_item0, _busy_item1, _busy_item2 = sorted(_busy, key=attrgetter('item_osfid')) + # busy-month item0 + assert isinstance(_busy_item0, PublicItemUsageReport) + assert _busy_item0.item_osfid == 'item0' + assert _busy_item0.provider_id == ['prov0'] + assert _busy_item0.platform_iri == ['http://osf.example'] + assert _busy_item0.view_count == 4 * 7 + assert _busy_item0.view_session_count == 4 + assert _busy_item0.download_count == 4 * 5 + assert _busy_item0.download_session_count == 4 + # busy-month item1 + assert isinstance(_busy_item1, PublicItemUsageReport) + assert _busy_item1.item_osfid == 'item1' + assert _busy_item1.provider_id == ['prov0', 'prov1'] + assert _busy_item1.platform_iri == ['http://osf.example'] + assert _busy_item1.view_count == 6 * 9 + 11 + assert _busy_item1.view_session_count == 6 + 11 + assert _busy_item1.download_count == 5 * 7 + assert _busy_item1.download_session_count == 5 + # busy-month item2 + assert isinstance(_busy_item2, PublicItemUsageReport) + assert _busy_item2.item_osfid == 'item2' + assert _busy_item2.provider_id == ['prov0'] + assert _busy_item2.platform_iri == ['http://osf.example'] + assert _busy_item2.view_count == 11 + assert _busy_item2.view_session_count == 11 + assert _busy_item2.download_count == 11 + assert _busy_item2.download_session_count == 11 + + +def _save_usage(**kwargs): + _kwargs = { # overridable defaults: + 'platform_iri': 'http://osf.example', + 'item_public': True, + 'provider_id': 'prov0', + **kwargs, + } + CountedAuthUsage(**_kwargs).save(refresh=True)