Skip to content

Commit

Permalink
Add a script to verify Github attestations
Browse files Browse the repository at this point in the history
  • Loading branch information
almet committed Jan 22, 2025
1 parent 9aa84a2 commit e0e1458
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 0 deletions.
240 changes: 240 additions & 0 deletions dev_scripts/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
#!/usr/bin/python

import hashlib
import re
import shutil
import subprocess
from tempfile import NamedTemporaryFile

import click
import requests

DEFAULT_REPO = "freedomofpress/dangerzone"
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"


class RegistryClient:
def __init__(self, registry, org, image):
self._registry = registry
self._org = org
self._image = image
self._auth_token = None
self._base_url = f"https://{registry}"
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}"

@property
def image(self):
return f"{self._registry}/{self._org}/{self._image}"

def get_auth_token(self):
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._org}/{self._image}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token

def get_auth_header(self):
return {"Authorization": f"Bearer {self.get_auth_token()}"}

def list_tags(self):
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags

def get_manifest(self, tag, extra_headers=None):
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)

response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response

def list_manifests(self, tag):
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)

def get_blob(self, hash):
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response

def get_attestation(self, tag):
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
"""

def _find_sigstore_bundle_manifest(manifests):
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]

def _get_bundle_blob_digest(layers):
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]

tag_manifest_content = self.get_manifest(tag).content

# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest()

# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")

# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise Error("Not able to find sigstore bundle manifest info")

bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()

# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])

blob_digest = _get_bundle_blob_digest(layers)
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content

def verify_attestation(self, image_tag: str, expected_repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
manifest, bundle = self.get_attestation(image_tag)

def _write(file, content):
file.write(content)
file.flush()

# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as bundle_json,
):
_write(manifest_json, manifest)
_write(bundle_json, bundle)

# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]

result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise Exception(f"Attestation cannot be verified. {result.stderr}")
return True


def parse_image_location(input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return match.group("registry", "namespace", "repository", "tag")


@click.group()
def main():
pass


@main.command()
@click.argument("image")
def list_tags(image):
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
tags = client.list_tags()
click.echo(f"Existing tags for {client.image}")
for tag in tags:
click.echo(tag)


@main.command()
@click.argument("image")
@click.option(
"--repo",
default=DEFAULT_REPO,
help="The github repository to check the attestation for",
)
def attest(image: str, repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
if shutil.which("cosign") is None:
click.echo("The cosign binary is needed but not installed.")
raise click.Abort()

registry, org, package, tag = parse_image_location(image)
tag = tag or "latest"

client = RegistryClient(registry, org, package)
verified = client.verify_attestation(tag, repo)
if verified:
click.echo(
f"🎉 The image available at `{client.image}:{tag}` has been built by Github Runners from the `{repo}` repository"
)


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions docs/developer/independent-container-updates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Independent Container Updates

Since version 0.9.0, Dangerzone is able to ship container images independently
from issuing a new release of the software.

This is useful as images need to be kept updated with the latest security fixes.

## Nightly images and attestations

Each night, new images are built and pushed to our container registry, alongside
with a provenance attestation, enabling anybody to ensure that the image has
been originally built by Github CI runners, from a defined source repository (in our case `freedomofpress/dangerzone`).

To verify the attestations against our expectations, use the following command:
```bash
poetry run ./dev_scripts/registry.py attest ghcr.io/freedomofpress/dangerzone/dangerzone:latest --repo freedomofpress/dangerzone
```

In case of sucess, it will report back:

```
🎉 The image available at `ghcr.io/freedomofpress/dangerzone/dangerzone:latest` has been built by Github runners from the `freedomofpress/dangerzone` repository.
```

0 comments on commit e0e1458

Please sign in to comment.