Skip to content

Commit

Permalink
PublicItemUsageReport(er) + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Sep 30, 2024
1 parent 884bdfc commit 980523c
Show file tree
Hide file tree
Showing 5 changed files with 513 additions and 3 deletions.
10 changes: 7 additions & 3 deletions osf/metrics/counted_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ def _fill_pageview_info(counted_usage):

def _fill_osfguid_info(counted_usage, guid_referent):
counted_usage.item_public = _get_ispublic(guid_referent)
counted_usage.item_type = type(guid_referent).__name__.lower()
counted_usage.item_type = get_item_type(guid_referent)
counted_usage.surrounding_guids = _get_surrounding_guids(guid_referent)
if not counted_usage.provider_id:
counted_usage.provider_id = _get_provider_id(guid_referent)
counted_usage.provider_id = get_provider_id(guid_referent)


def _fill_document_id(counted_usage):
Expand Down Expand Up @@ -153,7 +153,7 @@ def _get_ispublic(guid_referent):
return getattr(maybe_public, 'is_public', None) # quacks like AbstractNode


def _get_provider_id(guid_referent):
def get_provider_id(guid_referent):
provider = getattr(guid_referent, 'provider', None)
if isinstance(provider, str):
return provider # quacks like BaseFileNode
Expand All @@ -162,6 +162,10 @@ def _get_provider_id(guid_referent):
return 'osf' # quacks like Node, Comment, WikiPage


def get_item_type(guid_referent):
return type(guid_referent).__name__.lower()


def _get_immediate_wrapper(guid_referent):
if hasattr(guid_referent, 'verified_publishable'):
return None # quacks like Preprint
Expand Down
2 changes: 2 additions & 0 deletions osf/metrics/reporters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .node_count import NodeCountReporter
from .osfstorage_file_count import OsfstorageFileCountReporter
from .preprint_count import PreprintCountReporter
from .public_item_usage import PublicItemUsageReporter
from .user_count import UserCountReporter
from .spam_count import SpamCountReporter

Expand All @@ -28,3 +29,4 @@ class AllDailyReporters(enum.Enum):
class AllMonthlyReporters(enum.Enum):
SPAM_COUNT = SpamCountReporter
INSTITUTIONAL_USERS = InstitutionalUsersReporter
ITEM_USAGE = PublicItemUsageReporter
251 changes: 251 additions & 0 deletions osf/metrics/reporters/public_item_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
import elasticsearch_dsl as edsl

from osf.metrics.counted_usage import (
CountedAuthUsage,
get_item_type,
get_provider_id,
)
from osf.metrics.reports import PublicItemUsageReport
from osf.metrics.utils import YearMonth
from osf import models as osfdb
from website import settings as website_settings
from ._base import MonthlyReporter


_CHUNK_SIZE = 500


class _SkipItem(Exception):
pass


class PublicItemUsageReporter(MonthlyReporter):
'''build a PublicItemUsageReport for each public item
includes projects, project components, registrations, registration components, and preprints
'''

def report(self, yearmonth: YearMonth):
_exact_and_combined_buckets = _zip_composite_aggs(
self._exact_item_search(yearmonth), 'agg_osfid',
self._contained_item_views_search(yearmonth), 'agg_outer_osfid',
)
for _exact_bucket, _contained_views_bucket in _exact_and_combined_buckets:
try:
_report = self._report_from_buckets(_exact_bucket, _contained_views_bucket)
_report.view_session_count = self._get_view_session_count(yearmonth, _report.item_osfid)
yield _report
except _SkipItem:
pass

def _report_from_buckets(self, exact_bucket, contained_views_bucket):
assert (exact_bucket is not None) or (contained_views_bucket is not None)
_report = (
self._init_report_from_exact_bucket(exact_bucket)
if exact_bucket is not None
else self._init_report_from_osfid(contained_views_bucket.key.osfid)
)
# view counts include views on contained items (components, files)
if contained_views_bucket is not None:
_report.view_count += contained_views_bucket.doc_count
return _report

def _init_report_from_exact_bucket(self, exact_bucket) -> PublicItemUsageReport:
# in the (should-be common) case of an item that has been directly viewed in
# this month, the stored metrics already have the data required
_report = PublicItemUsageReport(
item_osfid=exact_bucket.key.osfid,
item_type=_agg_keys(exact_bucket.agg_item_type),
provider_id=_agg_keys(exact_bucket.agg_provider_id),
platform_iri=_agg_keys(exact_bucket.agg_platform_iri),
# default counts to zero, will be updated if non-zero
view_count=0,
view_session_count=0,
download_count=0,
download_session_count=0,
)
for _actionbucket in exact_bucket.agg_action:
if _actionbucket.key == CountedAuthUsage.ActionLabel.VIEW.value:
_report.view_count = _actionbucket.doc_count
# note: view_session_count computed separately to avoid double-counting
elif _actionbucket.key == CountedAuthUsage.ActionLabel.DOWNLOAD.value:
_report.download_count = _actionbucket.doc_count
_report.download_session_count = _actionbucket.agg_session_count.value
return _report

def _init_report_from_osfid(self, osfid: str) -> PublicItemUsageReport:
# for the (should-be unusual) case where the components/files contained by
# an item have views in this month, but the item itself does not --
# load necessary info via django models, instead
_osfguid = osfdb.Guid.load(osfid)
if _osfguid is None:
raise _SkipItem
return PublicItemUsageReport(
item_osfid=osfid,
item_type=[get_item_type(_osfguid.referent)],
provider_id=[get_provider_id(_osfguid.referent)],
platform_iri=[website_settings.DOMAIN],
# default counts to zero, will be updated if non-zero
view_count=0,
view_session_count=0,
download_count=0,
download_session_count=0,
)

def _base_usage_search(self, yearmonth):
return (
CountedAuthUsage.search()
.filter('term', item_public=True)
.filter('range', timestamp={
'gte': yearmonth.target_month(),
'lt': yearmonth.next_month(),
})
.update_from_dict({'size': 0}) # only aggregations, no hits
)

def _exact_item_search(self, yearmonth) -> edsl.Search:
'''aggregate views and downloads on each osfid (not including components/files)'''
_search = self._base_usage_search(yearmonth)
# the main agg: use a composite aggregation to page thru *every* item
_agg_osfid = _search.aggs.bucket(
'agg_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'item_guid'}}}],
size=_CHUNK_SIZE,
)
# nested agg: for each item, get platform_iri values
_agg_osfid.bucket('agg_platform_iri', 'terms', field='platform_iri')
# nested agg: for each item, get provider_id values
_agg_osfid.bucket('agg_provider_id', 'terms', field='provider_id')
# nested agg: for each item, get item_type values
_agg_osfid.bucket('agg_item_type', 'terms', field='item_type')
# nested agg: for each item, get view and download count
_agg_action = _agg_osfid.bucket(
'agg_action',
'terms',
field='action_labels',
include=[
CountedAuthUsage.ActionLabel.DOWNLOAD.value,
CountedAuthUsage.ActionLabel.VIEW.value,
],
)
# nested nested agg: for each item-action pair, get a session count
_agg_action.metric(
'agg_session_count',
'cardinality',
field='session_id',
precision_threshold=40000, # maximum precision
)
return _search

def _contained_item_views_search(self, yearmonth) -> edsl.Search:
'''aggregate views (but not downloads) on components and files contained within each osfid'''
_search = (
self._base_usage_search(yearmonth)
.filter('term', action_labels=CountedAuthUsage.ActionLabel.VIEW.value)
)
# the main agg: use a composite aggregation to page thru *every* item
_search.aggs.bucket(
'agg_outer_osfid',
'composite',
sources=[{'osfid': {'terms': {'field': 'surrounding_guids'}}}],
size=_CHUNK_SIZE,
)
return _search

def _get_view_session_count(self, yearmonth, osfid: str):
'''compute view_session_count separately to avoid double-counting
(the same session may be represented in both the composite agg on `item_guid`
and that on `surrounding_guids`)
'''
_search = (
self._base_usage_search(yearmonth)
.query(
'bool',
filter=[
{'term': {'action_labels': CountedAuthUsage.ActionLabel.VIEW.value}},
],
should=[
{'term': {'item_guid': osfid}},
{'term': {'surrounding_guids': osfid}},
],
minimum_should_match=1,
)
)
_search.aggs.metric(
'agg_session_count',
'cardinality',
field='session_id',
precision_threshold=40000, # maximum precision
)
_response = _search.execute()
return _response.aggregations.agg_session_count.value


###
# local helpers

def _agg_keys(bucket_agg_result) -> list:
return [_bucket.key for _bucket in bucket_agg_result]


def _zip_composite_aggs(
search_a: edsl.Search,
composite_agg_name_a: str,
search_b: edsl.Search,
composite_agg_name_b: str,
):
'''iterate thru two composite aggregations, yielding pairs of buckets matched by key
the composite aggregations must have matching names in `sources`
'''
_iter_a = _iter_composite_buckets(search_a, composite_agg_name_a)
_iter_b = _iter_composite_buckets(search_b, composite_agg_name_b)
_next_a = next(_iter_a, None)
_next_b = next(_iter_b, None)
while True:
if _next_a is None and _next_b is None:
return # both done
elif _next_a is None or _next_b is None:
# one is done but not the other -- no matching needed
yield (_next_a, _next_b)
_next_a = next(_iter_a, None)
_next_b = next(_iter_b, None)
elif _next_a.key == _next_b.key:
# match -- yield and increment both
yield (_next_a, _next_b)
_next_a = next(_iter_a, None)
_next_b = next(_iter_b, None)
elif _orderable_key(_next_a) < _orderable_key(_next_b):
# mismatch -- yield and increment a (but not b)
yield (_next_a, None)
_next_a = next(_iter_a, None)
else:
# mismatch -- yield and increment b (but not a)
yield (None, _next_b)
_next_b = next(_iter_b, None)


def _iter_composite_buckets(search: edsl.Search, composite_agg_name: str):
'''iterate thru *all* buckets of a composite aggregation, requesting new pages as needed
assumes the given search has a composite aggregation of the given name
updates the search in-place for subsequent pages
'''
while True:
_page_response = search.execute(ignore_cache=True)
_agg_result = _page_response.aggregations[composite_agg_name]
yield from _agg_result.buckets
# update the search for the next page
try:
_next_after = _agg_result.after_key
except AttributeError:
return # all done
else:
search.aggs[composite_agg_name].after = _next_after


def _orderable_key(composite_bucket) -> tuple:
return tuple(sorted(composite_bucket.key.to_dict().items()))
19 changes: 19 additions & 0 deletions osf/metrics/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,3 +270,22 @@ class InstitutionalUserReport(MonthlyReport):
published_preprint_count = metrics.Integer()
public_file_count = metrics.Long()
storage_byte_count = metrics.Long()


class PublicItemUsageReport(MonthlyReport):
UNIQUE_TOGETHER_FIELDS = ('report_yearmonth', 'item_osfid')

# where noted, fields correspond to defined terms from COUNTER
# https://cop5.projectcounter.org/en/5.1/appendices/a-glossary-of-terms.html
item_osfid = metrics.Keyword() # counter:Item
item_type = metrics.Keyword(multi=True) # counter:Data-Type
provider_id = metrics.Keyword(multi=True) # counter:Database(?)
platform_iri = metrics.Keyword(multi=True) # counter:Platform

# view counts include views on components or files contained by this item
view_count = metrics.Long() # counter:Total_Item_Investigations
view_session_count = metrics.Long() # counter:Unique_Item_Investigations

# download counts of this item only (not including contained components or files)
download_count = metrics.Long() # counter:Total_Item_Requests
download_session_count = metrics.Long() # counter:Unique_Item_Requests
Loading

0 comments on commit 980523c

Please sign in to comment.