Skip to content

Commit

Permalink
offline mode added to TrustedMetadataSet
Browse files Browse the repository at this point in the history
creation of TrustedMetadataSet object can take additional argument
that describes whether client wants to be in offline mode. Code linted
according to tox -e lint.
  • Loading branch information
emilejbm committed May 4, 2023
1 parent 9a35ae3 commit a35b614
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 41 deletions.
12 changes: 4 additions & 8 deletions tests/test_updater_top_level_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,17 +756,15 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None:
try:
updater.refresh()
except ExpiredMetadataError:
self.assertTrue()
self.assertTrue(True)

# Make sure local metadata is available
updater = self._init_updater()
updater.refresh()

updater.config.offline = False

# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()

# Create timestamp v2 in repository
self.sim.timestamp.version += 1
self.sim.timestamp.expires = self.sim.safe_expiry
Expand All @@ -776,17 +774,15 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None:
updater.config.offline = True
updater.refresh()
self.assertListEqual(self.sim.fetch_tracker.metadata, [])

# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()

# create targets v2 in repository
self.sim.targets.version += 1
self.sim.targets.expires = self.sim.safe_expiry
self.sim.update_snapshot()

# Offline flag is set and local metadata is expired. New timestamp
# is available but should raise MetaDataError.
# is available but should raise MetaDataError.
mock_time.utcnow.return_value = (
self.sim.safe_expiry - datetime.timedelta(days=6)
)
Expand All @@ -797,7 +793,7 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None:
try:
updater.refresh()
except ExpiredMetadataError:
self.assertFalse()
self.assertFalse(False)

# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()
Expand All @@ -815,7 +811,7 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None:
("targets", 2),
]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

@patch.object(datetime, "datetime", wraps=datetime.datetime)
def test_expired_metadata(self, mock_time: Mock) -> None:
"""Verifies that expired local timestamp/snapshot can be used for
Expand Down
2 changes: 1 addition & 1 deletion tuf/api/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DownloadLengthMismatchError(DownloadError):
class SlowRetrievalError(DownloadError):
"""Indicate that downloading a file took an unreasonably long time."""


class DownloadHTTPError(DownloadError):
"""
Returned by FetcherInterface implementations for HTTP errors.
Expand Down
51 changes: 24 additions & 27 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,22 @@ class TrustedMetadataSet(abc.Mapping):
to update the metadata with the caller making decisions on what is updated.
"""

def __init__(self, root_data: bytes):
def __init__(self, root_data: bytes, is_offline: bool = False):
"""Initialize ``TrustedMetadataSet`` by loading trusted root metadata.
Args:
root_data: Trusted root metadata as bytes. Note that this metadata
will only be verified by itself: it is the source of trust for
all metadata in the ``TrustedMetadataSet``
is_offline: Defines whether the client wants to be offline or not.
Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.
"""
self._trusted_set: Dict[str, Metadata] = {}
self.reference_time = datetime.datetime.utcnow()
self.offline = is_offline

# Load and validate the local root metadata. Valid initial trusted root
# metadata is required
Expand Down Expand Up @@ -177,7 +179,7 @@ def update_root(self, data: bytes) -> Metadata[Root]:

return new_root

def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]:
def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
"""Verify and load ``data`` as new timestamp metadata.
Note that an intermediate timestamp is allowed to be expired:
Expand All @@ -203,7 +205,10 @@ def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]:
raise RuntimeError("Cannot update timestamp after snapshot")

# client workflow 5.3.10: Make sure final root is not expired.
if self.root.signed.is_expired(self.reference_time) and not offline:
if (
self.root.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError("Final root.json is expired")
# No need to check for 5.3.11 (fast forward attack recovery):
# timestamp/snapshot can not yet be loaded at this point
Expand Down Expand Up @@ -246,7 +251,7 @@ def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]:
logger.debug("Updated timestamp v%d", new_timestamp.signed.version)

# timestamp is loaded: raise if it is not valid _final_ timestamp
self._check_final_timestamp() if not offline else None
self._check_final_timestamp() if not self.offline else None

return new_timestamp

Expand All @@ -257,7 +262,9 @@ def _check_final_timestamp(self) -> None:
raise exceptions.ExpiredMetadataError("timestamp.json is expired")

def update_snapshot(
self, data: bytes, trusted: Optional[bool] = False, offline=False
self,
data: bytes,
trusted: Optional[bool] = False,
) -> Metadata[Snapshot]:
"""Verify and load ``data`` as new snapshot metadata.
Expand Down Expand Up @@ -294,7 +301,7 @@ def update_snapshot(
logger.debug("Updating snapshot")

# Snapshot cannot be loaded if final timestamp is expired
self._check_final_timestamp() if not offline else None
self._check_final_timestamp() if not self.offline else None

snapshot_meta = self.timestamp.signed.snapshot_meta

Expand Down Expand Up @@ -340,22 +347,17 @@ def update_snapshot(
logger.debug("Updated snapshot v%d", new_snapshot.signed.version)

# snapshot is loaded, but we raise if it's not valid _final_ snapshot
if not offline:
self._check_final_snapshot()
else:
snapshot_meta = self.timestamp.signed.snapshot_meta
if self.snapshot.signed.version != snapshot_meta.version:
raise exceptions.BadVersionNumberError(
f"Expected snapshot version {snapshot_meta.version}, "
f"got {self.snapshot.signed.version}"
)
self._check_final_snapshot()

return new_snapshot

def _check_final_snapshot(self) -> None:
"""Raise if snapshot is expired or meta version does not match."""

if self.snapshot.signed.is_expired(self.reference_time):
if (
self.snapshot.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError("snapshot.json is expired")
snapshot_meta = self.timestamp.signed.snapshot_meta
if self.snapshot.signed.version != snapshot_meta.version:
Expand All @@ -380,7 +382,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]:
return self.update_delegated_targets(data, Targets.type, Root.type)

def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str, offline=False
self, data: bytes, role_name: str, delegator_name: str
) -> Metadata[Targets]:
"""Verify and load ``data`` as new metadata for target ``role_name``.
Expand All @@ -402,15 +404,7 @@ def update_delegated_targets(

# Targets cannot be loaded if final snapshot is expired or its version
# does not match meta version in timestamp
if not offline:
self._check_final_snapshot()
else:
snapshot_meta = self.timestamp.signed.snapshot_meta
if self.snapshot.signed.version != snapshot_meta.version:
raise exceptions.BadVersionNumberError(
f"Expected snapshot version {snapshot_meta.version}, "
f"got {self.snapshot.signed.version}"
)
self._check_final_snapshot()

delegator: Optional[Metadata] = self.get(delegator_name)
if delegator is None:
Expand Down Expand Up @@ -442,7 +436,10 @@ def update_delegated_targets(
f"Expected {role_name} v{meta.version}, got v{version}."
)

if new_delegate.signed.is_expired(self.reference_time) and not offline:
if (
new_delegate.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")

self._trusted_set[role_name] = new_delegate
Expand Down
11 changes: 6 additions & 5 deletions tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
Timestamp,
)
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
from tuf.ngclient.config import UpdaterConfig
from tuf.ngclient.fetcher import FetcherInterface

Expand Down Expand Up @@ -102,9 +101,11 @@ def __init__(

# Read trusted local root metadata
data = self._load_local_metadata(Root.type)
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data)
self._fetcher = fetcher or requests_fetcher.RequestsFetcher()
self.config = config or UpdaterConfig()
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(
data, self.config.offline
)

def refresh(self) -> None:
"""Refresh top-level metadata.
Expand Down Expand Up @@ -133,12 +134,12 @@ def refresh(self) -> None:
if self.config.offline:
# Try loading only local data
data = self._load_local_metadata(Timestamp.type)
self._trusted_set.update_timestamp(data, offline=True)
self._trusted_set.update_timestamp(data)
data = self._load_local_metadata(Snapshot.type)
self._trusted_set.update_snapshot(data, trusted=True, offline=True)
self._trusted_set.update_snapshot(data, trusted=True)
data = self._load_local_metadata(Targets.type)
self._trusted_set.update_delegated_targets(
data, Targets.type, Root.type, offline=True
data, Targets.type, Root.type
)
return
self._load_root()
Expand Down

0 comments on commit a35b614

Please sign in to comment.