Skip to content

Commit

Permalink
Merge branch 'main' into address_nested_set_warning
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Pollock <[email protected]>
  • Loading branch information
andrewpollock committed Sep 12, 2024
2 parents 800f207 + b9c4a8d commit 1151df8
Show file tree
Hide file tree
Showing 16 changed files with 9,450 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ The defined database prefixes and their "home" databases are:
</td>
</tr>
<tr>
<td><code>Red Hat</code></td>
<td><code>RHSA</code>/<code>RHBA</code>/<code>RHEA</code></td>
<td><a href="https://security.access.redhat.com/data">Red Hat Security Data</a></td>
<td>
<ul>
Expand Down
8 changes: 8 additions & 0 deletions tools/redhat/.pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[MESSAGES CONTROL]
disable=
broad-except,
fixme,
too-few-public-methods,
too-many-branches,
too-many-locals,
unspecified-encoding,
5 changes: 5 additions & 0 deletions tools/redhat/.style.yapf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[style]
based_on_style = pep8
column_limit = 80
indent_width = 4
split_before_named_assigns = true
12 changes: 12 additions & 0 deletions tools/redhat/Pipfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[[source]]
url = "https://pypi.python.org/simple"
verify_ssl = true
name = "pypi"

[packages]
jsonschema = "*"
requests = "*"

[dev-packages]
pylint = "*"
yapf = "*"
377 changes: 377 additions & 0 deletions tools/redhat/Pipfile.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions tools/redhat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Red Hat CSAF to OSV Converter

## Setup

~~~
$ pipenv sync
$ pipenv shell
~~~

## Usage

Needs to be run in a folder where the Red Hat CSAF documents to convert already exist. Files can be downloaded the [Red Hat Customer Portal Security Data section](https://access.redhat.com/security/data/csaf/v2/advisories/)
~~~
$ ./convert_redhat.py csaf/rhsa-2024_4546.json
~~~

OSV documents will be output in the `osv` directory by default. Override the default with the `--output_directory` option.

## Tests

~~~
$ python3 -m unittest *_test.py
~~~
78 changes: 78 additions & 0 deletions tools/redhat/convert_redhat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python3
""" Convert a CSAF document to OSV format
i.e. https://access.redhat.com/security/data/csaf/v2/advisories/2024/rhsa-2024_4546.json
"""
import argparse
import json
import sys
from datetime import datetime

import requests
from jsonschema import validate
from csaf import CSAF
from osv import DATE_FORMAT, OSV, OSVEncoder, SCHEMA_VERSION


class RedHatConverter:
"""
Class which converts and validates a CSAF string to an OSV string
"""
SCHEMA = (
f"https://raw.githubusercontent.com/ossf/osv-schema/v{SCHEMA_VERSION}"
"/validation/schema.json")
REQUEST_TIMEOUT = 60

def __init__(self):
schema_content = requests.get(self.SCHEMA, timeout=self.REQUEST_TIMEOUT)
self.osv_schema = schema_content.json()

def convert(self,
csaf_content: str,
modified: str,
published: str = "") -> tuple[str, str]:
"""
Converts csaf_content json string into an OSV json string
returns an OSV ID and the json string content of the OSV file
the json string content will be empty if no content is applicable
throws a validation error in the schema doesn't validate correctly.
The modified value for osv is passed in so it matches what's in all.json
Raises ValueError is CSAF file can't be parsed
"""
csaf = CSAF(csaf_content)
osv = OSV(csaf, modified, published)

# We convert from an OSV object to a JSON string here in order to use the OSVEncoder
# Once we OSV json string data we validate it using the OSV schema
osv_content = json.dumps(osv, cls=OSVEncoder, indent=2)
osv_data = json.loads(osv_content)
validate(osv_data, schema=self.osv_schema)

return osv.id, osv_content


def main():
"""
Given a Red Hat CSAF document, covert it to OSV. Writes the OSV file to disk at 'osv' by default
"""
parser = argparse.ArgumentParser(description='CSAF to OSV Converter')
parser.add_argument("csaf", metavar="FILE", help='CSAF file to process')
parser.add_argument('--output_directory', dest='out_dir', default="osv")

args = parser.parse_args()

with open(args.csaf, "r", encoding="utf-8") as in_f:
csaf_data = in_f.read()

converter = RedHatConverter()
osv_id, osv_data = converter.convert(csaf_data,
datetime.now().strftime(DATE_FORMAT))

if not osv_data:
sys.exit(1)

with open(f"{args.out_dir}/{osv_id}.json", "w", encoding="utf-8") as out_f:
out_f.write(osv_data)


if __name__ == '__main__':
main()
34 changes: 34 additions & 0 deletions tools/redhat/convert_redhat_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Tests for converting a CSAF document to OSV format"""
import unittest
from datetime import datetime
import json
from convert_redhat import RedHatConverter
from osv import DATE_FORMAT


class TestRedHatConverter(unittest.TestCase):
"""Test end-to-end convertion from RedHAt CSAF to OSV format"""

def test_convert_redhat(self):
""" Test a single demo CSAF file """
modified_time = datetime.strptime("2024-09-02T14:30:00",
"%Y-%m-%dT%H:%M:%S")
csaf_file = "testdata/rhsa-2024_4546.json"
expected_file = "testdata/RHSA-2024_4546.json"

with open(csaf_file, "r", encoding="utf-8") as fp:
csaf_data = fp.read()
converter = RedHatConverter()
osv_data = converter.convert(csaf_data,
modified_time.strftime(DATE_FORMAT))

assert osv_data[0] == "RHSA-2024:4546"
result_data = json.loads(osv_data[1])

with open(expected_file, "r", encoding="utf-8") as fp:
expected_data = json.load(fp)
assert expected_data == result_data


if __name__ == '__main__':
unittest.main()
179 changes: 179 additions & 0 deletions tools/redhat/csaf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Module for parsing CSAF v2 advisories"""
import json
from dataclasses import dataclass, InitVar, field
from typing import Any, Iterable


@dataclass
class Remediation:
"""
class to handle remediation advice in CSAF data
"""

csaf_product_id: InitVar[str]
cpes: InitVar[dict[str, str]]
purls: InitVar[dict[str, str]]
product: str = field(init=False)
product_version: str = field(init=False)
component: str = field(init=False)
fixed_version: str = field(init=False)
purl: str = field(init=False)
cpe: str = field(init=False)

def __post_init__(self, csaf_product_id: str, cpes: dict[str, str],
purls: dict[str, str]):
if ":" not in csaf_product_id:
raise ValueError(
f"Did not find ':' in product_id: {csaf_product_id}")
(self.product, self.product_version) = csaf_product_id.split(":",
maxsplit=1)

# NEVRA stands for Name Epoch Version Release and Architecture
# We split the name from the rest of the 'version' data (EVRA). We store name as component.
split_component_version = self.product_version.rsplit("-", maxsplit=2)
if len(split_component_version) < 3:
raise ValueError(
f"Could not convert component into NEVRA: {self.product_version}"
)
# RHEL Modules have 4 colons in the name part of the NEVRA. If we detect a modular RPM
# product ID, discard the module part of the name and look for that in the purl dict.
# Ideally we would keep the module information and use it when scanning a RHEL system,
# however this is not done today by Clair: https://github.com/quay/claircore/pull/901/files
if split_component_version[0].count(":") == 4:
self.component = split_component_version[0].rsplit(":")[-1]
else:
self.component = split_component_version[0]
self.fixed_version = "-".join(
(split_component_version[1], split_component_version[2]))

try:
nevra = f"{self.component}-{self.fixed_version}"
self.purl = purls[nevra]
self.cpe = cpes[self.product]
except KeyError:
# pylint: disable=raise-missing-from
# Raising this as a ValueError instead of as a KeyError allows us to wrap
# the entire call to init() in try/catch block with a single exception type
raise ValueError(
f"Did not find {csaf_product_id} in product branches")

# There are many pkg:oci/ remediations in Red Hat data. However there are no strict
# rules enforced on versioning Red Hat containers, therefore we cant compare container
# versions to each other with 100% accuracy at this time.
if not self.purl.startswith("pkg:rpm/"):
raise ValueError(
"Non RPM remediations are not supported in OSV at this time")


@dataclass
class Vulnerability:
"""
class to handle vulnerability information
"""

csaf_vuln: InitVar[dict[str, Any]]
cpes: InitVar[dict[str, str]]
purls: InitVar[dict[str, str]]
cve_id: str = field(init=False)
cvss_v3_vector: str = field(init=False)
cvss_v3_base_score: str = field(init=False, default=None)
references: list[dict[str, str]] = field(init=False)
remediations: list[Remediation] = field(init=False)

def __post_init__(self, csaf_vuln: dict[str, Any], cpes: dict[str, str],
purls: dict[str, str]):
self.cve_id = csaf_vuln["cve"]
for score in csaf_vuln.get("scores", []):
if "cvss_v3" in score:
self.cvss_v3_vector = score["cvss_v3"]["vectorString"]
self.cvss_v3_base_score = score["cvss_v3"]["baseScore"]
else:
self.cvss_v3_base_score = ""
self.cvss_v3_vector = ""
self.references = csaf_vuln["references"]
self.remediations = []
for product_id in csaf_vuln["product_status"]["fixed"]:
self.remediations.append(Remediation(product_id, cpes, purls))


def gen_dict_extract(key, var: Iterable):
"""
Given a key value and dictionary or list, traverses that dictionary or list returning the value
of the given key.
From https://stackoverflow.com/questions/9807634/
find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
"""
if hasattr(var, "items"):
for k, v in var.items():
if k == key:
yield v
if isinstance(v, dict):
yield from gen_dict_extract(key, v)
elif isinstance(v, list):
for d in v:
yield from gen_dict_extract(key, d)


def build_product_maps(
product_tree_branches: dict) -> tuple[dict[str, str], dict[str, str]]:
"""
Given a CSAF product tree branch dictionary returns a tuple of CPEs by product ID and PURLs by
product ID.
"""
cpe_map = {}
purl_map = {}
products = gen_dict_extract("product", product_tree_branches)
for product in products:
product_id = product["product_id"]
if "product_identification_helper" in product:
helper = product["product_identification_helper"]
if "cpe" in helper:
cpe_map[product_id] = helper["cpe"]
elif "purl" in helper:
purl_map[product_id] = helper["purl"]
return cpe_map, purl_map


class CSAF:
"""
class to handle CSAF data read from a local file path
"""

def __init__(self, csaf_content: str):
csaf_data = json.loads(csaf_content)

if not csaf_data:
raise ValueError("Unable to load CSAF JSON data.")

self.doc = csaf_data["document"]

self.csaf = {
"type": self.doc["category"],
"csaf_version": self.doc["csaf_version"]
}

# Only support csaf_vex 2.0
if self.csaf != {"type": "csaf_vex", "csaf_version": "2.0"}:
raise ValueError(
f"Can only handle csaf_vex 2.0 documents. Got: {self.csaf}")

self.cpes, self.purls = build_product_maps(csaf_data["product_tree"])

self.vulnerabilities = [
Vulnerability(v, self.cpes, self.purls)
for v in (csaf_data["vulnerabilities"])
]

@property
def title(self):
"""
Document Title
"""
return self.doc["title"]

@property
def references(self):
"""
Document References
"""
return self.doc["references"]
25 changes: 25 additions & 0 deletions tools/redhat/csaf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Test parsing CSAF v2 advisories"""
import unittest

from csaf import Remediation


class CSAFTest(unittest.TestCase):
"""class to handle remediation advice in CSAF data"""

def test_parse_remediation(self):
"""Test parsing a CSAF Remediation and unpacking cpe and purl data"""
cpe = "cpe:/a:redhat:rhel_tus:8.4::appstream"
purl = "pkg:rpm/redhat/[email protected]%2Bel8.4.0%2B21078%2Ba96cfbf6?arch=src"
cpes = {"AppStream-8.4.0.Z.TUS": cpe}
purls = {"buildah-0:1.19.9-1.module+el8.4.0+21078+a96cfbf6.src": purl}
result = Remediation(
"AppStream-8.4.0.Z.TUS:container-tools:3.0:8040020240104111259:c0c392d5"
":buildah-0:1.19.9-1.module+el8.4.0+21078+a96cfbf6.src", cpes,
purls)
self.assertEqual(result.cpe, cpe)
self.assertEqual(result.purl, purl)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit 1151df8

Please sign in to comment.