Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TUF Offline Functionality #2363

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions tests/test_updater_top_level_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from tests.repository_simulator import RepositorySimulator
from tuf.api.exceptions import (
BadVersionNumberError,
DownloadError,
DownloadLengthMismatchError,
ExpiredMetadataError,
LengthOrHashMismatchError,
RepositoryError,
UnsignedMetadataError,
)
from tuf.api.metadata import (
Expand Down Expand Up @@ -739,6 +741,82 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
expected_calls = [("root", 2), ("timestamp", None)]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

@patch.object(datetime, "datetime", wraps=datetime.datetime)
def test_refresh_with_offline(self, mock_time: Mock) -> None:
# make v1 metadata expire a little earlier
self.sim.timestamp.expires = self.sim.safe_expiry - datetime.timedelta(
days=7
)
self.sim.targets.expires = self.sim.safe_expiry - datetime.timedelta(
days=5
)
# offline is not set and there is no metadata
self.sim.fetch_tracker.metadata.clear()
with patch("datetime.datetime", mock_time):
updater = self._init_updater()
updater.config.offline = False
try:
updater.refresh()
except Exception as e:
self.assertRaises(
(OSError, RepositoryError, DownloadError),
f"unexpected error raised {e}",
)

# Make sure local metadata is available
updater = self._init_updater()
updater.refresh()
updater.config.offline = False

# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()
# Create timestamp v2 in repository
self.sim.timestamp.version += 1
self.sim.timestamp.expires = self.sim.safe_expiry

# Offline flag is set and local metadata is valid (should continue)
updater = self._init_updater()
updater.config.offline = True
updater.refresh()
self.assertListEqual(self.sim.fetch_tracker.metadata, [])
# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()
# create targets v2 in repository
self.sim.targets.version += 1
self.sim.targets.expires = self.sim.safe_expiry
self.sim.update_snapshot()

# Offline flag is set and local metadata is expired. New timestamp
# is available but should raise ExpiredMetaDataError.
mock_time.utcnow.return_value = (
self.sim.safe_expiry - datetime.timedelta(days=6)
)

with patch("datetime.datetime", mock_time):
updater = self._init_updater()
updater.config.offline = True
try:
updater.refresh()
except ExpiredMetadataError:
self.assertTrue(True)

# Clean up fetch tracker data
self.sim.fetch_tracker.metadata.clear()

# Offline flag is not set (vanilla refresh)
with patch("datetime.datetime", mock_time):
updater = self._init_updater()
updater.config.offline = False
updater.refresh()

expected_calls = [
("root", 2),
("timestamp", None),
("snapshot", 2),
("targets", 2),
]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

@patch.object(datetime, "datetime", wraps=datetime.datetime)
def test_expired_metadata(self, mock_time: Mock) -> None:
"""Verifies that expired local timestamp/snapshot can be used for
Expand Down
29 changes: 22 additions & 7 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,22 @@ class TrustedMetadataSet(abc.Mapping):
to update the metadata with the caller making decisions on what is updated.
"""

def __init__(self, root_data: bytes):
def __init__(self, root_data: bytes, is_offline: bool = False):
"""Initialize ``TrustedMetadataSet`` by loading trusted root metadata.

Args:
root_data: Trusted root metadata as bytes. Note that this metadata
will only be verified by itself: it is the source of trust for
all metadata in the ``TrustedMetadataSet``
is_offline: Defines whether the client wants to be offline or not.

Raises:
RepositoryError: Metadata failed to load or verify. The actual
error type and content will contain more details.
"""
self._trusted_set: Dict[str, Metadata] = {}
self.reference_time = datetime.datetime.utcnow()
self.offline = is_offline

# Load and validate the local root metadata. Valid initial trusted root
# metadata is required
Expand Down Expand Up @@ -203,7 +205,10 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
raise RuntimeError("Cannot update timestamp after snapshot")

# client workflow 5.3.10: Make sure final root is not expired.
if self.root.signed.is_expired(self.reference_time):
if (
self.root.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError("Final root.json is expired")
# No need to check for 5.3.11 (fast forward attack recovery):
# timestamp/snapshot can not yet be loaded at this point
Expand Down Expand Up @@ -246,7 +251,8 @@ def update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
logger.debug("Updated timestamp v%d", new_timestamp.signed.version)

# timestamp is loaded: raise if it is not valid _final_ timestamp
self._check_final_timestamp()
if not self.offline:
self._check_final_timestamp()

return new_timestamp

Expand All @@ -257,7 +263,9 @@ def _check_final_timestamp(self) -> None:
raise exceptions.ExpiredMetadataError("timestamp.json is expired")

def update_snapshot(
self, data: bytes, trusted: Optional[bool] = False
self,
data: bytes,
trusted: Optional[bool] = False,
) -> Metadata[Snapshot]:
"""Verify and load ``data`` as new snapshot metadata.

Expand Down Expand Up @@ -294,7 +302,8 @@ def update_snapshot(
logger.debug("Updating snapshot")

# Snapshot cannot be loaded if final timestamp is expired
self._check_final_timestamp()
if not self.offline:
self._check_final_timestamp()

snapshot_meta = self.timestamp.signed.snapshot_meta

Expand Down Expand Up @@ -347,7 +356,10 @@ def update_snapshot(
def _check_final_snapshot(self) -> None:
"""Raise if snapshot is expired or meta version does not match."""

if self.snapshot.signed.is_expired(self.reference_time):
if (
self.snapshot.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError("snapshot.json is expired")
snapshot_meta = self.timestamp.signed.snapshot_meta
if self.snapshot.signed.version != snapshot_meta.version:
Expand Down Expand Up @@ -426,7 +438,10 @@ def update_delegated_targets(
f"Expected {role_name} v{meta.version}, got v{version}."
)

if new_delegate.signed.is_expired(self.reference_time):
if (
new_delegate.signed.is_expired(self.reference_time)
and not self.offline
):
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")

self._trusted_set[role_name] = new_delegate
Expand Down
2 changes: 2 additions & 0 deletions tuf/ngclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@


@dataclass
# pylint: disable=too-many-instance-attributes
class UpdaterConfig:
"""Used to store ``Updater`` configuration.

Expand All @@ -33,3 +34,4 @@ class UpdaterConfig:
snapshot_max_length: int = 2000000 # bytes
targets_max_length: int = 5000000 # bytes
prefix_targets_with_hash: bool = True
offline: bool = False
22 changes: 21 additions & 1 deletion tuf/ngclient/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ def __init__(

# Read trusted local root metadata
data = self._load_local_metadata(Root.type)
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(data)
self._fetcher = fetcher or requests_fetcher.RequestsFetcher()
self.config = config or UpdaterConfig()
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(
data, self.config.offline
)

def refresh(self) -> None:
"""Refresh top-level metadata.
Expand All @@ -129,6 +131,17 @@ def refresh(self) -> None:
DownloadError: Download of a metadata file failed in some way
"""

if self.config.offline:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably check self._trusted_set.offline

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be necessary? That is just set as self.config,offline on line 106-107?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.config.offline can be modified by the user after the Updater has been initialized but before the other methods are called. This does not make a lot of sense, but technically it is possible. In other words, if we check check self.config.offline here it may well be False even though self._trusted_set has been initialized as offline.

The current test case seems to be a good example of this.

# Try loading only local data
data = self._load_local_metadata(Timestamp.type)
self._trusted_set.update_timestamp(data)
data = self._load_local_metadata(Snapshot.type)
self._trusted_set.update_snapshot(data, trusted=True)
data = self._load_local_metadata(Targets.type)
self._trusted_set.update_delegated_targets(
data, Targets.type, Root.type
)
return
self._load_root()
self._load_timestamp()
self._load_snapshot()
Expand Down Expand Up @@ -224,11 +237,15 @@ def download_target(
DownloadError: Download of the target file failed in some way
RepositoryError: Downloaded target failed to be verified in some way
OSError: Failed to write target to file
RuntimeError: Download of target file cannot occur because in offline mode

Returns:
Local path to downloaded file
"""

if self.config.offline:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably check self._trusted_set.offline

raise RuntimeError("Cannot download when offline")

if filepath is None:
filepath = self._generate_target_file_path(targetinfo)

Expand Down Expand Up @@ -386,6 +403,9 @@ def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]:
logger.debug("Local %s is valid: not downloading new one", role)
return delegated_targets
except (OSError, exceptions.RepositoryError) as e:
# fails if local data is unavalible and in offline mode
if self.config.offline:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably check self._trusted_set.offline

raise exceptions.DownloadError("Local metadata is missing; cannot download new metadata in offline mode")
# Local 'role' does not exist or is invalid: update from remote
logger.debug("Failed to load local %s: %s", role, e)

Expand Down