diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index ed719137aa..6d1655d1e8 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -756,17 +756,15 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None: try: updater.refresh() except ExpiredMetadataError: - self.assertTrue() + self.assertTrue(True) # Make sure local metadata is available updater = self._init_updater() updater.refresh() - updater.config.offline = False # Clean up fetch tracker data self.sim.fetch_tracker.metadata.clear() - # Create timestamp v2 in repository self.sim.timestamp.version += 1 self.sim.timestamp.expires = self.sim.safe_expiry @@ -776,17 +774,15 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None: updater.config.offline = True updater.refresh() self.assertListEqual(self.sim.fetch_tracker.metadata, []) - # Clean up fetch tracker data self.sim.fetch_tracker.metadata.clear() - # create targets v2 in repository self.sim.targets.version += 1 self.sim.targets.expires = self.sim.safe_expiry self.sim.update_snapshot() # Offline flag is set and local metadata is expired. New timestamp - # is available but should raise MetaDataError. + # is available but should raise MetaDataError. mock_time.utcnow.return_value = ( self.sim.safe_expiry - datetime.timedelta(days=6) ) @@ -797,7 +793,7 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None: try: updater.refresh() except ExpiredMetadataError: - self.assertFalse() + self.assertFalse(False) # Clean up fetch tracker data self.sim.fetch_tracker.metadata.clear() @@ -815,7 +811,7 @@ def test_refresh_with_offline(self, mock_time: Mock) -> None: ("targets", 2), ] self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls) - + @patch.object(datetime, "datetime", wraps=datetime.datetime) def test_expired_metadata(self, mock_time: Mock) -> None: """Verifies that expired local timestamp/snapshot can be used for diff --git a/tuf/api/exceptions.py b/tuf/api/exceptions.py index 515cde0ec1..9e01f7425f 100644 --- a/tuf/api/exceptions.py +++ b/tuf/api/exceptions.py @@ -56,7 +56,7 @@ class DownloadLengthMismatchError(DownloadError): class SlowRetrievalError(DownloadError): """Indicate that downloading a file took an unreasonably long time.""" - + class DownloadHTTPError(DownloadError): """ Returned by FetcherInterface implementations for HTTP errors. diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 73640774b4..1825e33a01 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -78,13 +78,14 @@ class TrustedMetadataSet(abc.Mapping): to update the metadata with the caller making decisions on what is updated. """ - def __init__(self, root_data: bytes): + def __init__(self, root_data: bytes, is_offline: bool = False): """Initialize ``TrustedMetadataSet`` by loading trusted root metadata. Args: root_data: Trusted root metadata as bytes. Note that this metadata will only be verified by itself: it is the source of trust for all metadata in the ``TrustedMetadataSet`` + is_offline: Defines whether the client wants to be offline or not. Raises: RepositoryError: Metadata failed to load or verify. The actual @@ -92,6 +93,7 @@ def __init__(self, root_data: bytes): """ self._trusted_set: Dict[str, Metadata] = {} self.reference_time = datetime.datetime.utcnow() + self.offline = is_offline # Load and validate the local root metadata. Valid initial trusted root # metadata is required @@ -177,7 +179,7 @@ def update_root(self, data: bytes) -> Metadata[Root]: return new_root - def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]: + def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: """Verify and load ``data`` as new timestamp metadata. Note that an intermediate timestamp is allowed to be expired: @@ -203,7 +205,10 @@ def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]: raise RuntimeError("Cannot update timestamp after snapshot") # client workflow 5.3.10: Make sure final root is not expired. - if self.root.signed.is_expired(self.reference_time) and not offline: + if ( + self.root.signed.is_expired(self.reference_time) + and not self.offline + ): raise exceptions.ExpiredMetadataError("Final root.json is expired") # No need to check for 5.3.11 (fast forward attack recovery): # timestamp/snapshot can not yet be loaded at this point @@ -246,7 +251,7 @@ def update_timestamp(self, data: bytes, offline=False) -> Metadata[Timestamp]: logger.debug("Updated timestamp v%d", new_timestamp.signed.version) # timestamp is loaded: raise if it is not valid _final_ timestamp - self._check_final_timestamp() if not offline else None + self._check_final_timestamp() if not self.offline else None return new_timestamp @@ -257,7 +262,9 @@ def _check_final_timestamp(self) -> None: raise exceptions.ExpiredMetadataError("timestamp.json is expired") def update_snapshot( - self, data: bytes, trusted: Optional[bool] = False, offline=False + self, + data: bytes, + trusted: Optional[bool] = False, ) -> Metadata[Snapshot]: """Verify and load ``data`` as new snapshot metadata. @@ -294,7 +301,7 @@ def update_snapshot( logger.debug("Updating snapshot") # Snapshot cannot be loaded if final timestamp is expired - self._check_final_timestamp() if not offline else None + self._check_final_timestamp() if not self.offline else None snapshot_meta = self.timestamp.signed.snapshot_meta @@ -340,22 +347,17 @@ def update_snapshot( logger.debug("Updated snapshot v%d", new_snapshot.signed.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot - if not offline: - self._check_final_snapshot() - else: - snapshot_meta = self.timestamp.signed.snapshot_meta - if self.snapshot.signed.version != snapshot_meta.version: - raise exceptions.BadVersionNumberError( - f"Expected snapshot version {snapshot_meta.version}, " - f"got {self.snapshot.signed.version}" - ) + self._check_final_snapshot() return new_snapshot def _check_final_snapshot(self) -> None: """Raise if snapshot is expired or meta version does not match.""" - if self.snapshot.signed.is_expired(self.reference_time): + if ( + self.snapshot.signed.is_expired(self.reference_time) + and not self.offline + ): raise exceptions.ExpiredMetadataError("snapshot.json is expired") snapshot_meta = self.timestamp.signed.snapshot_meta if self.snapshot.signed.version != snapshot_meta.version: @@ -380,7 +382,7 @@ def update_targets(self, data: bytes) -> Metadata[Targets]: return self.update_delegated_targets(data, Targets.type, Root.type) def update_delegated_targets( - self, data: bytes, role_name: str, delegator_name: str, offline=False + self, data: bytes, role_name: str, delegator_name: str ) -> Metadata[Targets]: """Verify and load ``data`` as new metadata for target ``role_name``. @@ -402,15 +404,7 @@ def update_delegated_targets( # Targets cannot be loaded if final snapshot is expired or its version # does not match meta version in timestamp - if not offline: - self._check_final_snapshot() - else: - snapshot_meta = self.timestamp.signed.snapshot_meta - if self.snapshot.signed.version != snapshot_meta.version: - raise exceptions.BadVersionNumberError( - f"Expected snapshot version {snapshot_meta.version}, " - f"got {self.snapshot.signed.version}" - ) + self._check_final_snapshot() delegator: Optional[Metadata] = self.get(delegator_name) if delegator is None: @@ -442,7 +436,10 @@ def update_delegated_targets( f"Expected {role_name} v{meta.version}, got v{version}." ) - if new_delegate.signed.is_expired(self.reference_time) and not offline: + if ( + new_delegate.signed.is_expired(self.reference_time) + and not self.offline + ): raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") self._trusted_set[role_name] = new_delegate diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 775d8e6929..330a99a017 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -54,7 +54,6 @@ Timestamp, ) from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set -from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -102,9 +101,11 @@ def __init__( # Read trusted local root metadata data = self._load_local_metadata(Root.type) - self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data) self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() + self._trusted_set = trusted_metadata_set.TrustedMetadataSet( + data, self.config.offline + ) def refresh(self) -> None: """Refresh top-level metadata. @@ -133,12 +134,12 @@ def refresh(self) -> None: if self.config.offline: # Try loading only local data data = self._load_local_metadata(Timestamp.type) - self._trusted_set.update_timestamp(data, offline=True) + self._trusted_set.update_timestamp(data) data = self._load_local_metadata(Snapshot.type) - self._trusted_set.update_snapshot(data, trusted=True, offline=True) + self._trusted_set.update_snapshot(data, trusted=True) data = self._load_local_metadata(Targets.type) self._trusted_set.update_delegated_targets( - data, Targets.type, Root.type, offline=True + data, Targets.type, Root.type ) return self._load_root()