From d523ecbd4bd12475e68806948d7ecc2c2af87b30 Mon Sep 17 00:00:00 2001 From: masklinn Date: Mon, 26 Feb 2024 20:12:31 +0100 Subject: [PATCH] Split Parser and reorganise package Parser turns out to not really make sense as a superclass / ABC: it really only has one useful method, and because parsers use delegation there's no real way to override the utility methods / shortcuts, so they're only useful on the caller / client side but they constrain the implementor (who has to extend the ABC and then possibly deal with multiple-inheritance shenanigans). Making the core object just a callable protocol instead makes the implementation somewhat simpler and more flexible (e.g. just a function or HoF can be a "parser"), however the convenient utility methods *are* important for end users and should not be discounted. For that, keep a wrapper `Parser` object which can be wrapped around a "parser" in order to provide the additional convenience (similar to the free functions at the root). Importantly, `Parser` methods can also be used as free functions by passing a "parser" as `self`, they are intended to be compatible. It doesn't work super well from the typechecking perspective, but it works fine enough. Consideration was given to making the free functions at the package root parametric on the parser e.g. def parse(ua: str, resolver: Optional[Resolver] = None, /) -> ParseResult: if resolver is None: from . import parser as resolver return resolver(ua, Domain.ALL).complete() but that feels like it would be pretty error prone, in the sense that it would be too easy to forget to pass in the resolver, compared to consistently resolving via a bespoke parser, or just installing a parser globally. Also move things around a bit: - move matcher utility functions out of the core, un-prefix them since we're using `__all__` for visibility anyway - move eager matchers out of the core, similar to the lazy matchers Fixes #189 --- setup.py | 2 +- src/ua_parser/__init__.py | 98 +++++++++---- src/ua_parser/_matchers.pyi | 2 +- src/ua_parser/basic.py | 9 +- src/ua_parser/bench.py | 20 +-- src/ua_parser/caching.py | 20 +-- src/ua_parser/core.py | 235 +------------------------------ src/ua_parser/hitrates.py | 33 +++-- src/ua_parser/lazy.py | 27 ++-- src/ua_parser/loaders.py | 22 +-- src/ua_parser/matchers.py | 169 ++++++++++++++++++++++ src/ua_parser/re2.py | 5 +- src/ua_parser/threaded.py | 14 +- src/ua_parser/utils.py | 30 ++++ tests/test_caches.py | 33 ++--- tests/test_convenience_parser.py | 13 ++ tests/test_core.py | 29 ++-- tests/test_parsers_basics.py | 12 +- 18 files changed, 404 insertions(+), 369 deletions(-) create mode 100644 src/ua_parser/matchers.py create mode 100644 src/ua_parser/utils.py create mode 100644 tests/test_convenience_parser.py diff --git a/setup.py b/setup.py index 5730379..c694778 100644 --- a/setup.py +++ b/setup.py @@ -182,7 +182,7 @@ class EagerWriter(Writer): __all__ = ["MATCHERS"] from typing import Tuple, List -from .core import UserAgentMatcher, OSMatcher, DeviceMatcher +from .matchers import UserAgentMatcher, OSMatcher, DeviceMatcher MATCHERS: Tuple[List[UserAgentMatcher], List[OSMatcher], List[DeviceMatcher]] = ([ """ diff --git a/src/ua_parser/__init__.py b/src/ua_parser/__init__.py index dcc06b5..2c6121d 100644 --- a/src/ua_parser/__init__.py +++ b/src/ua_parser/__init__.py @@ -16,29 +16,25 @@ This way importing anything but the top-level package should not be necessary unless you want to *implement* a parser. """ +from __future__ import annotations __all__ = [ - "BasicParser", - "CachingParser", + "BasicResolver", + "CachingResolver", "Clearing", "DefaultedParseResult", "Device", - "DeviceMatcher", "Domain", "LRU", "Locking", "Matchers", "OS", - "OSMatcher", "ParseResult", - "Parser", + "Resolver", "PartialParseResult", "UserAgent", - "UserAgentMatcher", "load_builtins", "load_lazy_builtins", - "load_data", - "load_yaml", "parse", "parse_device", "parse_os", @@ -48,43 +44,89 @@ import contextlib from typing import Callable, Optional -from .basic import Parser as BasicParser -from .caching import CachingParser, Clearing, Locking, LRU +from .basic import Resolver as BasicResolver +from .caching import CachingResolver, Clearing, Locking, LRU from .core import ( DefaultedParseResult, Device, - DeviceMatcher, Domain, Matchers, OS, - OSMatcher, - Parser, ParseResult, PartialParseResult, + Resolver, UserAgent, - UserAgentMatcher, ) -from .loaders import load_builtins, load_data, load_lazy_builtins, load_yaml +from .loaders import load_builtins, load_lazy_builtins -Re2Parser: Optional[Callable[[Matchers], Parser]] = None +Re2Resolver: Optional[Callable[[Matchers], Resolver]] = None with contextlib.suppress(ImportError): - from .re2 import Parser as Re2Parser + from .re2 import Resolver as Re2Resolver VERSION = (1, 0, 0) + + +class Parser: + @classmethod + def from_matchers(cls, m: Matchers, /) -> Parser: + if Re2Resolver is not None: + return cls(Re2Resolver(m)) + else: + return cls( + CachingResolver( + BasicResolver(m), + Locking(LRU(200)), + ) + ) + + def __init__(self, resolver: Resolver) -> None: + self.resolver = resolver + + def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: + """Parses the ``ua`` string, returning a parse result with *at least* + the requested :class:`domains ` resolved (whether to success or + failure). + + A parser may resolve more :class:`domains ` than + requested, but it *must not* resolve less. + """ + return self.resolver(ua, domains) + + def parse(self, ua: str) -> ParseResult: + """Convenience method for parsing all domains, and falling back to + default values for all failures. + """ + return self(ua, Domain.ALL).complete() + + def parse_user_agent(self, ua: str) -> Optional[UserAgent]: + """Convenience method for parsing the :class:`UserAgent` domain, + falling back to the default value in case of failure. + """ + return self(ua, Domain.USER_AGENT).user_agent + + def parse_os(self, ua: str) -> Optional[OS]: + """Convenience method for parsing the :class:`OS` domain, falling back + to the default value in case of failure. + """ + return self(ua, Domain.OS).os + + def parse_device(self, ua: str) -> Optional[Device]: + """Convenience method for parsing the :class:`Device` domain, falling + back to the default value in case of failure. + """ + return self(ua, Domain.DEVICE).device + + parser: Parser def __getattr__(name: str) -> Parser: global parser if name == "parser": - if Re2Parser is not None: - parser = Re2Parser(load_lazy_builtins()) - else: - parser = CachingParser( - BasicParser(load_builtins()), - Locking(LRU(200)), - ) + parser = Parser.from_matchers( + load_builtins() if Re2Resolver is None else load_lazy_builtins() + ) return parser raise AttributeError(f"module {__name__!r} has no attribute {name!r}") @@ -105,7 +147,7 @@ def parse(ua: str) -> ParseResult: # parser, a `global` access fails to and we get a NameError from . import parser - return parser.parse(ua) + return parser(ua, Domain.ALL).complete() def parse_user_agent(ua: str) -> Optional[UserAgent]: @@ -114,7 +156,7 @@ def parse_user_agent(ua: str) -> Optional[UserAgent]: """ from . import parser - return parser.parse_user_agent(ua) + return parser(ua, Domain.USER_AGENT).user_agent def parse_os(ua: str) -> Optional[OS]: @@ -123,7 +165,7 @@ def parse_os(ua: str) -> Optional[OS]: """ from . import parser - return parser.parse_os(ua) + return parser(ua, Domain.OS).os def parse_device(ua: str) -> Optional[Device]: @@ -132,4 +174,4 @@ def parse_device(ua: str) -> Optional[Device]: """ from . import parser - return parser.parse_device(ua) + return parser(ua, Domain.DEVICE).device diff --git a/src/ua_parser/_matchers.pyi b/src/ua_parser/_matchers.pyi index 7c4388a..2269fb4 100644 --- a/src/ua_parser/_matchers.pyi +++ b/src/ua_parser/_matchers.pyi @@ -2,7 +2,7 @@ __all__ = ["MATCHERS"] from typing import List, Tuple -from .core import DeviceMatcher, OSMatcher, UserAgentMatcher +from .matchers import DeviceMatcher, OSMatcher, UserAgentMatcher MATCHERS: Tuple[ List[UserAgentMatcher], diff --git a/src/ua_parser/basic.py b/src/ua_parser/basic.py index 58b4f6d..4575ac0 100644 --- a/src/ua_parser/basic.py +++ b/src/ua_parser/basic.py @@ -1,3 +1,5 @@ +__all__ = ["Resolver"] + from operator import methodcaller from typing import List @@ -7,13 +9,12 @@ Matcher, Matchers, OS, - Parser as AbstractParser, PartialParseResult, UserAgent, ) -class Parser(AbstractParser): +class Resolver: """A simple pure-python parser based around trying a numer of regular expressions in sequence for each domain, and returning a result when one matches. @@ -27,9 +28,7 @@ def __init__( self, matchers: Matchers, ) -> None: - self.user_agent_matchers = matchers[0] - self.os_matchers = matchers[1] - self.device_matchers = matchers[2] + self.user_agent_matchers, self.os_matchers, self.device_matchers = matchers def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: parse = methodcaller("__call__", ua) diff --git a/src/ua_parser/bench.py b/src/ua_parser/bench.py index b84bba3..e63ed2a 100644 --- a/src/ua_parser/bench.py +++ b/src/ua_parser/bench.py @@ -7,18 +7,18 @@ from typing import Any, Callable, Iterable, List, Optional from . import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Locking, LRU, Matchers, Parser, - load_builtins, - load_yaml, + Resolver, ) from .caching import Cache -from .re2 import Parser as Re2Parser +from .loaders import load_builtins, load_yaml +from .re2 import Resolver as Re2Resolver from .user_agent_parser import Parse CACHEABLE = { @@ -222,19 +222,19 @@ def run_csv(args: argparse.Namespace) -> None: def get_parser( parser: str, cache: str, cachesize: int, rules: Matchers ) -> Callable[[str], Any]: - p: Parser + r: Resolver if parser == "legacy": return Parse elif parser == "basic": - p = BasicParser(rules) + r = BasicResolver(rules) elif parser == "re2": - p = Re2Parser(rules) + r = Re2Resolver(rules) else: sys.exit(f"unknown parser {parser!r}") c: Callable[[int], Cache] if cache == "none": - return p.parse + return Parser(r).parse elif cache == "clearing": c = Clearing elif cache == "lru": @@ -244,7 +244,7 @@ def get_parser( else: sys.exit(f"unknown cache algorithm {cache!r}") - return CachingParser(p, c(cachesize)).parse + return Parser(CachingResolver(r, c(cachesize))).parse def run( diff --git a/src/ua_parser/caching.py b/src/ua_parser/caching.py index 358b574..f5667f4 100644 --- a/src/ua_parser/caching.py +++ b/src/ua_parser/caching.py @@ -1,12 +1,12 @@ import abc import threading from collections import OrderedDict -from typing import Dict, Optional +from typing import Dict, Optional, Protocol -from .core import Domain, Parser, PartialParseResult +from .core import Domain, PartialParseResult, Resolver __all__ = [ - "CachingParser", + "CachingResolver", "Cache", "Clearing", "Locking", @@ -14,7 +14,7 @@ ] -class Cache(abc.ABC): +class Cache(Protocol): """Cache abstract protocol. The :class:`CachingParser` will look values up, merge what was returned (possibly nothing) with what it got from its actual parser, and *re-set the result*. @@ -33,7 +33,7 @@ def __getitem__(self, key: str) -> Optional[PartialParseResult]: ... -class Clearing(Cache): +class Clearing: """A clearing cache, if the cache is full, just remove all the entries and re-fill from scratch. @@ -62,7 +62,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache[key] = value -class LRU(Cache): +class LRU: """Cache following a least-recently used replacement policy: when there is no more room in the cache, whichever entry was last seen the least recently is removed. @@ -103,7 +103,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache.popitem(last=False) -class Locking(Cache): +class Locking: """Locking cache decorator. Takes a non-thread-safe cache and ensures retrieving and setting entries is protected by a mutex. @@ -122,7 +122,7 @@ def __setitem__(self, key: str, value: PartialParseResult) -> None: self.cache[key] = value -class CachingParser(Parser): +class CachingResolver: """A wrapping parser which takes an underlying concrete :class:`Cache` for the actual caching and cache strategy. @@ -134,8 +134,8 @@ class CachingParser(Parser): really, they're immutable). """ - def __init__(self, parser: Parser, cache: Cache): - self.parser: Parser = parser + def __init__(self, parser: Resolver, cache: Cache): + self.parser: Resolver = parser self.cache: Cache = cache def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: diff --git a/src/ua_parser/core.py b/src/ua_parser/core.py index 54837e4..ca2fc75 100644 --- a/src/ua_parser/core.py +++ b/src/ua_parser/core.py @@ -1,22 +1,18 @@ import abc -import re from dataclasses import dataclass from enum import Flag, auto -from typing import Generic, List, Literal, Match, Optional, Pattern, Tuple, TypeVar +from typing import Callable, Generic, List, Optional, Tuple, TypeVar __all__ = [ "DefaultedParseResult", "Device", - "DeviceMatcher", "Domain", "Matchers", "OS", - "OSMatcher", "ParseResult", - "Parser", "PartialParseResult", + "Resolver", "UserAgent", - "UserAgentMatcher", ] @@ -155,70 +151,7 @@ def complete(self) -> ParseResult: ) -class Parser(abc.ABC): - @abc.abstractmethod - def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: - """Parses the ``ua`` string, returning a parse result with *at least* - the requested :class:`domains ` resolved (whether to success or - failure). - - A parser may resolve more :class:`domains ` than - requested, but it *must not* resolve less. - """ - ... - - def parse(self, ua: str) -> ParseResult: - """Convenience method for parsing all domains, and falling back to - default values for all failures. - """ - return self(ua, Domain.ALL).complete() - - def parse_user_agent(self, ua: str) -> Optional[UserAgent]: - """Convenience method for parsing the :class:`UserAgent` domain, - falling back to the default value in case of failure. - """ - return self(ua, Domain.USER_AGENT).user_agent - - def parse_os(self, ua: str) -> Optional[OS]: - """Convenience method for parsing the :class:`OS` domain, falling back - to the default value in case of failure. - """ - return self(ua, Domain.OS).os - - def parse_device(self, ua: str) -> Optional[Device]: - """Convenience method for parsing the :class:`Device` domain, falling - back to the default value in case of failure. - """ - return self(ua, Domain.DEVICE).device - - -def _get(m: Match[str], idx: int) -> Optional[str]: - return (m[idx] or None) if 0 < idx <= m.re.groups else None - - -def _replacer(repl: str, m: Match[str]) -> Optional[str]: - """The replacement rules are frustratingly subtle and innimical to - standard python fallback semantics: - - - if there is a non-null replacement pattern, then it must be used with - match groups as template parameters (at indices 1+) - - the result is stripped - - if it is an empty string, then it's replaced by a null - - otherwise fallback to a (possibly optional) match group - - or null (device brand has no fallback) - - Replacement rules only apply to OS and Device matchers, the UA - matcher has bespoke replacement semantics for the family (just - $1), and no replacement for the other fields, either there is a - static replacement or it falls back to the corresponding - (optional) match group. - - """ - if not repl: - return None - - return re.sub(r"\$(\d)", lambda n: _get(m, int(n[1])) or "", repl).strip() or None - +Resolver = Callable[[str, Domain], PartialParseResult] T = TypeVar("T") @@ -238,168 +171,6 @@ def flags(self) -> int: return 0 -class UserAgentMatcher(Matcher[UserAgent]): - regex: Pattern[str] - family: str - major: Optional[str] - minor: Optional[str] - patch: Optional[str] - patch_minor: Optional[str] - - def __init__( - self, - regex: str, - family: Optional[str] = None, - major: Optional[str] = None, - minor: Optional[str] = None, - patch: Optional[str] = None, - patch_minor: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex) - self.family = family or "$1" - self.major = major - self.minor = minor - self.patch = patch - self.patch_minor = patch_minor - - def __call__(self, ua: str) -> Optional[UserAgent]: - if m := self.regex.search(ua): - return UserAgent( - family=( - self.family.replace("$1", m[1]) - if "$1" in self.family - else self.family - ), - major=self.major or _get(m, 2), - minor=self.minor or _get(m, 3), - patch=self.patch or _get(m, 4), - patch_minor=self.patch_minor or _get(m, 5), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("major", self.major), - ("minor", self.minor), - ("patch", self.patch), - ("patch_minor", self.patch_minor), - ] - args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"UserAgentMatcher({self.pattern!r}{args})" - - -class OSMatcher(Matcher[OS]): - regex: Pattern[str] - family: str - major: str - minor: str - patch: str - patch_minor: str - - def __init__( - self, - regex: str, - family: Optional[str] = None, - major: Optional[str] = None, - minor: Optional[str] = None, - patch: Optional[str] = None, - patch_minor: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex) - self.family = family or "$1" - self.major = major or "$2" - self.minor = minor or "$3" - self.patch = patch or "$4" - self.patch_minor = patch_minor or "$5" - - def __call__(self, ua: str) -> Optional[OS]: - if m := self.regex.search(ua): - family = _replacer(self.family, m) - if family is None: - raise ValueError(f"Unable to find OS family in {ua}") - return OS( - family=family, - major=_replacer(self.major, m), - minor=_replacer(self.minor, m), - patch=_replacer(self.patch, m), - patch_minor=_replacer(self.patch_minor, m), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("major", self.major if self.major != "$2" else None), - ("minor", self.minor if self.minor != "$3" else None), - ("patch", self.patch if self.patch != "$4" else None), - ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), - ] - args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"OSMatcher({self.pattern!r}{args})" - - -class DeviceMatcher(Matcher[Device]): - regex: Pattern[str] - family: str - brand: str - model: str - - def __init__( - self, - regex: str, - regex_flag: Optional[Literal["i"]] = None, - family: Optional[str] = None, - brand: Optional[str] = None, - model: Optional[str] = None, - ) -> None: - self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) - self.family = family or "$1" - self.brand = brand or "" - self.model = model or "$1" - - def __call__(self, ua: str) -> Optional[Device]: - if m := self.regex.search(ua): - family = _replacer(self.family, m) - if family is None: - raise ValueError(f"Unable to find device family in {ua}") - return Device( - family=family, - brand=_replacer(self.brand, m), - model=_replacer(self.model, m), - ) - return None - - @property - def pattern(self) -> str: - return self.regex.pattern - - @property - def flags(self) -> int: - return self.regex.flags - - def __repr__(self) -> str: - fields = [ - ("family", self.family if self.family != "$1" else None), - ("brand", self.brand or None), - ("model", self.model if self.model != "$1" else None), - ] - iflag = ', "i"' if self.flags & re.IGNORECASE else "" - args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) - - return f"DeviceMatcher({self.pattern!r}{args})" - - Matchers = Tuple[ List[Matcher[UserAgent]], List[Matcher[OS]], diff --git a/src/ua_parser/hitrates.py b/src/ua_parser/hitrates.py index a5739d5..61e19cd 100644 --- a/src/ua_parser/hitrates.py +++ b/src/ua_parser/hitrates.py @@ -1,29 +1,31 @@ import argparse import itertools +from typing import Callable, List from . import ( - CachingParser, + CachingResolver, Clearing, Domain, LRU, Parser, PartialParseResult, + Resolver, ) +from .caching import Cache -class Noop(Parser): - def __call__(self, ua: str, domains: Domain, /) -> PartialParseResult: - return PartialParseResult( - domains=domains, - string=ua, - user_agent=None, - os=None, - device=None, - ) +def Noop(ua: str, domains: Domain, /) -> PartialParseResult: + return PartialParseResult( + domains=domains, + string=ua, + user_agent=None, + os=None, + device=None, + ) -class Counter(Parser): - def __init__(self, parser: Parser) -> None: +class Counter: + def __init__(self, parser: Resolver) -> None: self.count = 0 self.parser = parser @@ -60,12 +62,13 @@ def main() -> None: print(total, "lines", uniques, "uniques") print(f"ideal hit rate: {(total - uniques)/total:.0%}") print() + caches: List[Callable[[int], Cache]] = [Clearing, LRU] for cache, cache_size in itertools.product( - [Clearing, LRU], + caches, args.cachesizes, ): - misses = Counter(Noop()) - parser = CachingParser(misses, cache(cache_size)) + misses = Counter(Noop) + parser = Parser(CachingResolver(misses, cache(cache_size))) for line in lines: parser.parse(line) diff --git a/src/ua_parser/lazy.py b/src/ua_parser/lazy.py index d9e0219..7311252 100644 --- a/src/ua_parser/lazy.py +++ b/src/ua_parser/lazy.py @@ -4,7 +4,8 @@ from functools import cached_property from typing import Literal, Optional, Pattern -from .core import Device, Matcher, OS, UserAgent, _get, _replacer +from .core import Device, Matcher, OS, UserAgent +from .utils import get, replacer class UserAgentMatcher(Matcher[UserAgent]): @@ -39,10 +40,10 @@ def __call__(self, ua: str) -> Optional[UserAgent]: if "$1" in self.family else self.family ), - major=self.major or _get(m, 2), - minor=self.minor or _get(m, 3), - patch=self.patch or _get(m, 4), - patch_minor=self.patch_minor or _get(m, 5), + major=self.major or get(m, 2), + minor=self.minor or get(m, 3), + patch=self.patch or get(m, 4), + patch_minor=self.patch_minor or get(m, 5), ) return None @@ -89,15 +90,15 @@ def __init__( def __call__(self, ua: str) -> Optional[OS]: if m := self.regex.search(ua): - family = _replacer(self.family, m) + family = replacer(self.family, m) if family is None: raise ValueError(f"Unable to find OS family in {ua}") return OS( family=family, - major=_replacer(self.major, m), - minor=_replacer(self.minor, m), - patch=_replacer(self.patch, m), - patch_minor=_replacer(self.patch_minor, m), + major=replacer(self.major, m), + minor=replacer(self.minor, m), + patch=replacer(self.patch, m), + patch_minor=replacer(self.patch_minor, m), ) return None @@ -141,13 +142,13 @@ def __init__( def __call__(self, ua: str) -> Optional[Device]: if m := self.regex.search(ua): - family = _replacer(self.family, m) + family = replacer(self.family, m) if family is None: raise ValueError(f"Unable to find device family in {ua}") return Device( family=family, - brand=_replacer(self.brand, m), - model=_replacer(self.model, m), + brand=replacer(self.brand, m), + model=replacer(self.model, m), ) return None diff --git a/src/ua_parser/loaders.py b/src/ua_parser/loaders.py index 66a294c..ab0ae34 100644 --- a/src/ua_parser/loaders.py +++ b/src/ua_parser/loaders.py @@ -1,14 +1,16 @@ from __future__ import annotations __all__ = [ + "DeviceDict", + "MatchersData", + "OSDict", + "UserAgentDict", "load_builtins", - "load_lazy_builtins", "load_data", + "load_json", + "load_lazy", + "load_lazy_builtins", "load_yaml", - "MatchersData", - "UserAgentDict", - "OSDict", - "DeviceDict", ] import io @@ -28,8 +30,8 @@ cast, ) -from . import lazy -from .core import DeviceMatcher, Matchers, OSMatcher, UserAgentMatcher +from . import lazy, matchers +from .core import Matchers if TYPE_CHECKING: PathOrFile = Union[str, os.PathLike[str], io.IOBase] @@ -93,7 +95,7 @@ class DeviceDict(_RegexDict, total=False): def load_data(d: MatchersData) -> Matchers: return ( [ - UserAgentMatcher( + matchers.UserAgentMatcher( p["regex"], p.get("family_replacement"), p.get("v1_replacement"), @@ -104,7 +106,7 @@ def load_data(d: MatchersData) -> Matchers: for p in d[0] ], [ - OSMatcher( + matchers.OSMatcher( p["regex"], p.get("os_replacement"), p.get("os_v1_replacement"), @@ -115,7 +117,7 @@ def load_data(d: MatchersData) -> Matchers: for p in d[1] ], [ - DeviceMatcher( + matchers.DeviceMatcher( p["regex"], p.get("regex_flag"), p.get("device_replacement"), diff --git a/src/ua_parser/matchers.py b/src/ua_parser/matchers.py new file mode 100644 index 0000000..6104da0 --- /dev/null +++ b/src/ua_parser/matchers.py @@ -0,0 +1,169 @@ +__all__ = ["UserAgentMatcher", "OSMatcher", "DeviceMatcher"] + +import re +from typing import Literal, Optional, Pattern + +from .core import Device, Matcher, OS, UserAgent +from .utils import get, replacer + + +class UserAgentMatcher(Matcher[UserAgent]): + regex: Pattern[str] + family: str + major: Optional[str] + minor: Optional[str] + patch: Optional[str] + patch_minor: Optional[str] + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major + self.minor = minor + self.patch = patch + self.patch_minor = patch_minor + + def __call__(self, ua: str) -> Optional[UserAgent]: + if m := self.regex.search(ua): + return UserAgent( + family=( + self.family.replace("$1", m[1]) + if "$1" in self.family + else self.family + ), + major=self.major or get(m, 2), + minor=self.minor or get(m, 3), + patch=self.patch or get(m, 4), + patch_minor=self.patch_minor or get(m, 5), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major), + ("minor", self.minor), + ("patch", self.patch), + ("patch_minor", self.patch_minor), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"UserAgentMatcher({self.pattern!r}{args})" + + +class OSMatcher(Matcher[OS]): + regex: Pattern[str] + family: str + major: str + minor: str + patch: str + patch_minor: str + + def __init__( + self, + regex: str, + family: Optional[str] = None, + major: Optional[str] = None, + minor: Optional[str] = None, + patch: Optional[str] = None, + patch_minor: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex) + self.family = family or "$1" + self.major = major or "$2" + self.minor = minor or "$3" + self.patch = patch or "$4" + self.patch_minor = patch_minor or "$5" + + def __call__(self, ua: str) -> Optional[OS]: + if m := self.regex.search(ua): + family = replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find OS family in {ua}") + return OS( + family=family, + major=replacer(self.major, m), + minor=replacer(self.minor, m), + patch=replacer(self.patch, m), + patch_minor=replacer(self.patch_minor, m), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("major", self.major if self.major != "$2" else None), + ("minor", self.minor if self.minor != "$3" else None), + ("patch", self.patch if self.patch != "$4" else None), + ("patch_minor", self.patch_minor if self.patch_minor != "$5" else None), + ] + args = "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"OSMatcher({self.pattern!r}{args})" + + +class DeviceMatcher(Matcher[Device]): + regex: Pattern[str] + family: str + brand: str + model: str + + def __init__( + self, + regex: str, + regex_flag: Optional[Literal["i"]] = None, + family: Optional[str] = None, + brand: Optional[str] = None, + model: Optional[str] = None, + ) -> None: + self.regex = re.compile(regex, flags=re.IGNORECASE if regex_flag == "i" else 0) + self.family = family or "$1" + self.brand = brand or "" + self.model = model or "$1" + + def __call__(self, ua: str) -> Optional[Device]: + if m := self.regex.search(ua): + family = replacer(self.family, m) + if family is None: + raise ValueError(f"Unable to find device family in {ua}") + return Device( + family=family, + brand=replacer(self.brand, m), + model=replacer(self.model, m), + ) + return None + + @property + def pattern(self) -> str: + return self.regex.pattern + + @property + def flags(self) -> int: + return self.regex.flags + + def __repr__(self) -> str: + fields = [ + ("family", self.family if self.family != "$1" else None), + ("brand", self.brand or None), + ("model", self.model if self.model != "$1" else None), + ] + iflag = ', "i"' if self.flags & re.IGNORECASE else "" + args = iflag + "".join(f", {k}={v!r}" for k, v in fields if v is not None) + + return f"DeviceMatcher({self.pattern!r}{args})" diff --git a/src/ua_parser/re2.py b/src/ua_parser/re2.py index 559879b..c8cdd0b 100644 --- a/src/ua_parser/re2.py +++ b/src/ua_parser/re2.py @@ -1,4 +1,4 @@ -from __future__ import annotations +__all__ = ["Resolver"] import re from typing import List @@ -11,13 +11,12 @@ Matcher, Matchers, OS, - Parser as AbstractParser, PartialParseResult, UserAgent, ) -class Parser(AbstractParser): +class Resolver: ua: re2.Filter user_agent_matchers: List[Matcher[UserAgent]] os: re2.Filter diff --git a/src/ua_parser/threaded.py b/src/ua_parser/threaded.py index 15b2390..a0a3d13 100644 --- a/src/ua_parser/threaded.py +++ b/src/ua_parser/threaded.py @@ -6,15 +6,15 @@ from typing import Iterable from . import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Locking, LRU, Parser, load_builtins, ) -from .re2 import Parser as Re2Parser +from .re2 import Resolver as Re2Resolver def worker( @@ -54,11 +54,11 @@ def main() -> None: args = ap.parse_args() lines = list(args.file) - basic = BasicParser(load_builtins()) + basic = BasicResolver(load_builtins()) for name, parser in [ - ("clearing", CachingParser(basic, Clearing(CACHESIZE))), - ("LRU", CachingParser(basic, Locking(LRU(CACHESIZE)))), - ("re2", Re2Parser(load_builtins())), + ("clearing", CachingResolver(basic, Clearing(CACHESIZE))), + ("LRU", CachingResolver(basic, Locking(LRU(CACHESIZE)))), + ("re2", Re2Resolver(load_builtins())), ]: # randomize the dataset for each thread, predictably, to # simulate distributed load (not great but better than diff --git a/src/ua_parser/utils.py b/src/ua_parser/utils.py new file mode 100644 index 0000000..f3afa48 --- /dev/null +++ b/src/ua_parser/utils.py @@ -0,0 +1,30 @@ +import re +from typing import Match, Optional + + +def get(m: Match[str], idx: int) -> Optional[str]: + return (m[idx] or None) if 0 < idx <= m.re.groups else None + + +def replacer(repl: str, m: Match[str]) -> Optional[str]: + """The replacement rules are frustratingly subtle and innimical to + standard python fallback semantics: + + - if there is a non-null replacement pattern, then it must be used with + match groups as template parameters (at indices 1+) + - the result is stripped + - if it is an empty string, then it's replaced by a null + - otherwise fallback to a (possibly optional) match group + - or null (device brand has no fallback) + + Replacement rules only apply to OS and Device matchers, the UA + matcher has bespoke replacement semantics for the family (just + $1), and no replacement for the other fields, either there is a + static replacement or it falls back to the corresponding + (optional) match group. + + """ + if not repl: + return None + + return re.sub(r"\$(\d)", lambda n: get(m, int(n[1])) or "", repl).strip() or None diff --git a/tests/test_caches.py b/tests/test_caches.py index 5969e46..e41d978 100644 --- a/tests/test_caches.py +++ b/tests/test_caches.py @@ -1,19 +1,18 @@ from collections import OrderedDict from ua_parser import ( - BasicParser, - CachingParser, + BasicResolver, + CachingResolver, Clearing, Device, - DeviceMatcher, Domain, LRU, OS, - OSMatcher, + Parser, PartialParseResult, UserAgent, - UserAgentMatcher, ) +from ua_parser.matchers import DeviceMatcher, OSMatcher, UserAgentMatcher def test_clearing(): @@ -21,7 +20,7 @@ def test_clearing(): entries. """ cache = Clearing(2) - p = CachingParser(BasicParser(([], [], [])), cache) + p = Parser(CachingResolver(BasicResolver(([], [], [])), cache)) p.parse("a") p.parse("b") @@ -42,7 +41,7 @@ def test_lru(): popped LRU-first. """ cache = LRU(2) - p = CachingParser(BasicParser(([], [], [])), cache) + p = Parser(CachingResolver(BasicResolver(([], [], [])), cache)) p.parse("a") p.parse("b") @@ -69,15 +68,17 @@ def test_backfill(): existing entry when new parts get parsed. """ cache = Clearing(2) - p = CachingParser( - BasicParser( - ( - [UserAgentMatcher("(a)")], - [OSMatcher("(a)")], - [DeviceMatcher("(a)")], - ) - ), - cache, + p = Parser( + CachingResolver( + BasicResolver( + ( + [UserAgentMatcher("(a)")], + [OSMatcher("(a)")], + [DeviceMatcher("(a)")], + ) + ), + cache, + ) ) p.parse_user_agent("a") diff --git a/tests/test_convenience_parser.py b/tests/test_convenience_parser.py new file mode 100644 index 0000000..2d0668a --- /dev/null +++ b/tests/test_convenience_parser.py @@ -0,0 +1,13 @@ +from ua_parser import Parser, ParseResult, PartialParseResult + + +def test_parser_utility() -> None: + """Tests that ``Parser``'s methods to behave as procedural + helpers, for users who may not wish to instantiate a parser or + something. + + Sadly the typing doesn't really play nicely with that. + + """ + r = Parser.parse(lambda s, d: PartialParseResult(d, None, None, None, s), "a") + assert r == ParseResult(None, None, None, "a") diff --git a/tests/test_core.py b/tests/test_core.py index 3a73faf..5d8eca8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -26,34 +26,39 @@ from yaml import SafeLoader, load from ua_parser import ( - BasicParser, + BasicResolver, Device, OS, + Parser, ParseResult, UserAgent, - UserAgentMatcher, caching, load_builtins, load_lazy_builtins, ) +from ua_parser.matchers import UserAgentMatcher CORE_DIR = (pathlib.Path(__name__).parent.parent / "uap-core").resolve() PARSERS = [ - pytest.param(BasicParser(load_builtins()), id="basic"), - pytest.param(BasicParser(load_lazy_builtins()), id="lazy"), + pytest.param(Parser(BasicResolver(load_builtins())), id="basic"), + pytest.param(Parser(BasicResolver(load_lazy_builtins())), id="lazy"), pytest.param( - caching.CachingParser( - BasicParser(load_builtins()), - caching.Clearing(10), + Parser( + caching.CachingResolver( + BasicResolver(load_builtins()), + caching.Clearing(10), + ) ), id="clearing", ), pytest.param( - caching.CachingParser( - BasicParser(load_builtins()), - caching.LRU(10), + Parser( + caching.CachingResolver( + BasicResolver(load_builtins()), + caching.LRU(10), + ) ), id="lru", ), @@ -61,7 +66,7 @@ with contextlib.suppress(ImportError): from ua_parser import re2 - PARSERS.append(pytest.param(re2.Parser(load_builtins()), id="re2")) + PARSERS.append(pytest.param(Parser(re2.Resolver(load_builtins())), id="re2")) UA_FIELDS = {f.name for f in dataclasses.fields(UserAgent)} @@ -134,7 +139,7 @@ def test_devices(parser, test_file): def test_results(): - p = BasicParser(([UserAgentMatcher("(x)")], [], [])) + p = Parser(BasicResolver(([UserAgentMatcher("(x)")], [], []))) assert p.parse_user_agent("x") == UserAgent("x") assert p.parse_user_agent("y") is None diff --git a/tests/test_parsers_basics.py b/tests/test_parsers_basics.py index 9252745..895e89a 100644 --- a/tests/test_parsers_basics.py +++ b/tests/test_parsers_basics.py @@ -1,17 +1,17 @@ import io from ua_parser import ( - BasicParser, + BasicResolver, Domain, PartialParseResult, UserAgent, - UserAgentMatcher, - load_yaml, ) +from ua_parser.loaders import load_yaml +from ua_parser.matchers import UserAgentMatcher def test_trivial_matching(): - p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + p = BasicResolver(([UserAgentMatcher("(a)")], [], [])) assert p("x", Domain.ALL) == PartialParseResult( string="x", @@ -31,7 +31,7 @@ def test_trivial_matching(): def test_partial(): - p = BasicParser(([UserAgentMatcher("(a)")], [], [])) + p = BasicResolver(([UserAgentMatcher("(a)")], [], [])) assert p("x", Domain.USER_AGENT) == PartialParseResult( string="x", @@ -60,7 +60,7 @@ def test_init_yaml(): device_parsers: [] """ ) - p = BasicParser(load_yaml(f)) + p = BasicResolver(load_yaml(f)) assert p("x", Domain.USER_AGENT) == PartialParseResult( string="x",