Skip to content

Commit

Permalink
Merge branch 'main' into improvement/support-file-like-objects-in-rec…
Browse files Browse the repository at this point in the history
…ord-reader
  • Loading branch information
yunzheng authored Aug 28, 2023
2 parents 9ba1772 + 6358ba3 commit 6a8e0f4
Show file tree
Hide file tree
Showing 19 changed files with 455 additions and 106 deletions.
76 changes: 76 additions & 0 deletions .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Python Compatibility
on: [push, pull_request, workflow_dispatch]

jobs:
test-py37-py38:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- python-version: "3.7"
python-include: "python3.7"
tox-env: "py37"
allow_failure: false
- python-version: "3.8"
python-include: "python3.8"
tox-env: "py38"
allow_failure: false
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
lfs: true
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
cache-dependency-path: "pyproject.toml"
- run: pip install "tox==4.2.4"
- if: ${{ github.ref_name == 'main' }}
run: sed -i 's/\[tool.setuptools_scm\]/\[tool.setuptools_scm\]\nlocal_scheme = "no-local-version"/' pyproject.toml
- env:
C_INCLUDE_PATH: ${{ env.pythonLocation }}/include/${{ matrix.python-include }}
run: tox run -e ${{ matrix.tox-env }}
- uses: codecov/codecov-action@v3
with:
env_vars: PYTHON
files: coverage.xml
flags: unittests
verbose: true
- uses: actions/upload-artifact@v3
with:
name: coverage
path: coverage.xml
test-windows:
runs-on: windows-latest
strategy:
fail-fast: false
matrix:
include:
- python-version: "3.9"
python-include: "python3.9"
tox-env: "py39"
allow_failure: false
- python-version: "3.10"
python-include: "python3.10"
tox-env: "py310"
allow_failure: false
- python-version: "3.11"
python-include: "python3.11"
tox-env: "py311"
allow_failure: true
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
lfs: true
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
cache-dependency-path: "pyproject.toml"
- run: pip install "tox==4.2.4"
- env:
C_INCLUDE_PATH: ${{ env.pythonLocation }}/include/${{ matrix.python-include }}
run: tox run -e ${{ matrix.tox-env }}
2 changes: 1 addition & 1 deletion flow/record/adapter/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
index: str = "records",
http_compress: Union[str, bool] = True,
selector: Union[None, Selector, CompiledSelector] = None,
**kwargs
**kwargs,
) -> None:
self.index = index
self.uri = uri
Expand Down
5 changes: 3 additions & 2 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import re
import sys
import warnings
from datetime import datetime
from datetime import datetime, timezone
from itertools import zip_longest
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -65,6 +65,7 @@
from .whitelist import WHITELIST, WHITELIST_TREE

log = logging.getLogger(__package__)
_utcnow = functools.partial(datetime.now, timezone.utc)

RECORD_VERSION = 1
RESERVED_FIELDS = OrderedDict(
Expand Down Expand Up @@ -449,7 +450,7 @@ def _generate_record_class(name: str, fields: Tuple[Tuple[str, str]]) -> type:
_globals = {
"Record": Record,
"RECORD_VERSION": RECORD_VERSION,
"_utcnow": datetime.utcnow,
"_utcnow": _utcnow,
"_zip_longest": zip_longest,
}
for field in all_fields.values():
Expand Down
114 changes: 84 additions & 30 deletions flow/record/fieldtypes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
from __future__ import annotations

import binascii
import math
import os
import pathlib
import re
import sys
import warnings
from binascii import a2b_hex, b2a_hex
from datetime import datetime as _dt
from datetime import timezone
from posixpath import basename, dirname
from typing import Any, Tuple
from typing import Any, Optional, Tuple
from urllib.parse import urlparse

try:
import urlparse
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
except ImportError:
import urllib.parse as urlparse

import warnings
from backports.zoneinfo import ZoneInfo, ZoneInfoNotFoundError

from flow.record.base import FieldType

RE_NORMALIZE_PATH = re.compile(r"[\\/]+")
RE_STRIP_NANOSECS = re.compile(r"(\.\d{6})\d+")
NATIVE_UNICODE = isinstance("", str)

UTC = timezone.utc
ISO_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
ISO_FORMAT_WITH_MS = "%Y-%m-%dT%H:%M:%S.%f%z"

PY_311 = sys.version_info >= (3, 11, 0)

PATH_POSIX = 0
PATH_WINDOWS = 1

Expand All @@ -32,6 +41,31 @@
path_type = pathlib.PurePath


def flow_record_tz(*, default_tz: str = "UTC") -> Optional[ZoneInfo | UTC]:
"""Return a ``ZoneInfo`` object based on the ``FLOW_RECORD_TZ`` environment variable.
Args:
default_tz: Default timezone if ``FLOW_RECORD_TZ`` is not set (default: UTC).
Returns:
None if ``FLOW_RECORD_TZ=NONE`` otherwise ``ZoneInfo(FLOW_RECORD_TZ)`` or ``UTC`` if ZoneInfo is not found.
"""
tz = os.environ.get("FLOW_RECORD_TZ", default_tz)
if tz.upper() == "NONE":
return None
try:
return ZoneInfo(tz)
except ZoneInfoNotFoundError as exc:
warnings.warn(f"{exc!r}, falling back to timezone.utc")
return UTC


# The environment variable ``FLOW_RECORD_TZ`` affects the display of datetime fields.
#
# The timezone to use when displaying datetime fields. By default this is UTC.
DISPLAY_TZINFO = flow_record_tz(default_tz="UTC")


def defang(value: str) -> str:
"""Defangs the value to make URLs or ip addresses unclickable"""
value = re.sub("^http://", "hxxp://", value, flags=re.IGNORECASE)
Expand Down Expand Up @@ -238,24 +272,24 @@ def __new__(cls, *args, **kwargs):
# String constructor is used for example in JsonRecordAdapter
# Note: ISO 8601 is fully implemented in fromisoformat() from Python 3.11 and onwards.
# Until then, we need to manually detect timezone info and handle it.
if any(z in arg[19:] for z in ["Z", "+", "-"]):
if "." in arg[19:]:
try:
return cls.strptime(arg, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
# Sometimes nanoseconds need to be stripped
return cls.strptime(re.sub(RE_STRIP_NANOSECS, "\\1", arg), "%Y-%m-%dT%H:%M:%S.%f%z")
return cls.strptime(arg, "%Y-%m-%dT%H:%M:%S%z")
if not PY_311 and any(z in arg[19:] for z in ["Z", "+", "-"]):
spec = ISO_FORMAT_WITH_MS if "." in arg[19:] else ISO_FORMAT
try:
obj = cls.strptime(arg, spec)
except ValueError:
# Sometimes nanoseconds need to be stripped
obj = cls.strptime(re.sub(RE_STRIP_NANOSECS, "\\1", arg), spec)
else:
try:
return cls.fromisoformat(arg)
obj = cls.fromisoformat(arg)
except ValueError:
# Sometimes nanoseconds need to be stripped
return cls.fromisoformat(re.sub(RE_STRIP_NANOSECS, "\\1", arg))
obj = cls.fromisoformat(re.sub(RE_STRIP_NANOSECS, "\\1", arg))
elif isinstance(arg, (int, float_type)):
return cls.utcfromtimestamp(arg)
obj = cls.fromtimestamp(arg, UTC)
elif isinstance(arg, (_dt,)):
return _dt.__new__(
tzinfo = arg.tzinfo or UTC
obj = _dt.__new__(
cls,
arg.year,
arg.month,
Expand All @@ -264,24 +298,27 @@ def __new__(cls, *args, **kwargs):
arg.minute,
arg.second,
arg.microsecond,
arg.tzinfo,
tzinfo,
)
else:
obj = _dt.__new__(cls, *args, **kwargs)

return _dt.__new__(cls, *args, **kwargs)

def __eq__(self, other):
# Avoid TypeError: can't compare offset-naive and offset-aware datetimes
# naive datetimes are treated as UTC in flow.record instead of local time
ts1 = self.timestamp() if self.tzinfo else self.replace(tzinfo=timezone.utc).timestamp()
ts2 = other.timestamp() if other.tzinfo else other.replace(tzinfo=timezone.utc).timestamp()
return ts1 == ts2
# Ensure we always return a timezone aware datetime. Treat naive datetimes as UTC
if obj.tzinfo is None:
obj = obj.replace(tzinfo=UTC)
return obj

def _pack(self):
return self

def __str__(self):
return self.astimezone(DISPLAY_TZINFO).isoformat(" ") if DISPLAY_TZINFO else self.isoformat(" ")

def __repr__(self):
result = str(self)
return result
return str(self)

def __hash__(self):
return _dt.__hash__(self)


class varint(varint_type, FieldType):
Expand Down Expand Up @@ -459,7 +496,7 @@ def _unpack(cls, data):

class uri(string, FieldType):
def __init__(self, value):
self._parsed = urlparse.urlparse(value)
self._parsed = urlparse(value)

@staticmethod
def normalize(path):
Expand Down Expand Up @@ -606,6 +643,14 @@ def __new__(cls, *args):

return cls._from_parts(args)

def __eq__(self, other: Any) -> bool:
if isinstance(other, str):
return str(self) == other or self == self.__class__(other)
return super().__eq__(other)

def __repr__(self) -> str:
return repr(str(self))

def _pack(self):
path_type = PATH_WINDOWS if isinstance(self, windows_path) else PATH_POSIX
return (str(self), path_type)
Expand Down Expand Up @@ -637,4 +682,13 @@ class posix_path(pathlib.PurePosixPath, path):


class windows_path(pathlib.PureWindowsPath, path):
pass
def __repr__(self) -> str:
s = str(self)
quote = "'"
if "'" in s:
if '"' in s:
s = s.replace("'", "\\'")
else:
quote = '"'

return f"{quote}{s}{quote}"
2 changes: 1 addition & 1 deletion flow/record/jsonpacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def pack_obj(self, obj):
}
return serial
if isinstance(obj, datetime):
serial = obj.strftime("%Y-%m-%dT%H:%M:%S.%f")
serial = obj.isoformat()
return serial
if isinstance(obj, fieldtypes.digest):
return {
Expand Down
15 changes: 9 additions & 6 deletions flow/record/packer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
import functools
import warnings
from datetime import datetime, timezone

import msgpack

Expand Down Expand Up @@ -29,6 +29,8 @@
RECORD_PACK_TYPE_VARINT = 0x11
RECORD_PACK_TYPE_GROUPEDRECORD = 0x12

UTC = timezone.utc


def identifier_to_str(identifier):
if isinstance(identifier, tuple) and len(identifier) == 2:
Expand Down Expand Up @@ -61,9 +63,11 @@ def register(self, desc, notify=False):
def pack_obj(self, obj, unversioned=False):
packed = None

if isinstance(obj, datetime.datetime):
t = obj.utctimetuple()[:6] + (obj.microsecond,)
packed = (RECORD_PACK_TYPE_DATETIME, t)
if isinstance(obj, datetime):
if obj.tzinfo is None or obj.tzinfo == UTC:
packed = (RECORD_PACK_TYPE_DATETIME, (*obj.timetuple()[:6], obj.microsecond))
else:
packed = (RECORD_PACK_TYPE_DATETIME, (obj.isoformat(),))

elif isinstance(obj, int):
neg = obj < 0
Expand Down Expand Up @@ -102,8 +106,7 @@ def unpack_obj(self, t, data):
subtype, value = self.unpack(data)

if subtype == RECORD_PACK_TYPE_DATETIME:
dt = fieldtypes.datetime(*value)
return dt
return fieldtypes.datetime(*value)

if subtype == RECORD_PACK_TYPE_VARINT:
neg, h = value
Expand Down
4 changes: 2 additions & 2 deletions flow/record/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, path_template=None, name=None):

def rotate_existing_file(self, path):
if os.path.exists(path):
now = datetime.datetime.utcnow()
now = datetime.datetime.now(datetime.timezone.utc)
src = os.path.realpath(path)

src_dir = os.path.dirname(src)
Expand Down Expand Up @@ -224,7 +224,7 @@ def record_stream_for_path(self, path):
return self.writer

def write(self, record):
ts = record._generated or datetime.datetime.utcnow()
ts = record._generated or datetime.datetime.now(datetime.timezone.utc)
path = self.path_template.format(name=self.name, record=record, ts=ts)
rs = self.record_stream_for_path(path)
rs.write(record)
Expand Down
Loading

0 comments on commit 6a8e0f4

Please sign in to comment.