Skip to content

Commit

Permalink
feat: added script to populate deduplication database (#4223)
Browse files Browse the repository at this point in the history
* feat: added script to populate deduplication database

Signed-off-by: Meet Soni <[email protected]>

* refactor: improved naming in the populator script

Signed-off-by: Meet Soni <[email protected]>

* fix: bypassing diff

---------

Signed-off-by: Meet Soni <[email protected]>
  • Loading branch information
inosmeet authored Jul 2, 2024
1 parent 1aaa457 commit 20781f3
Showing 1 changed file with 82 additions and 0 deletions.
82 changes: 82 additions & 0 deletions cve_bin_tool/mismatch_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import itertools
import os
import sqlite3

import yaml

from cve_bin_tool.cvedb import DBNAME, DISK_LOCATION_DEFAULT
from cve_bin_tool.log import LOGGER

SQL_CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS mismatch(
purl TEXT,
vendor TEXT,
PRIMARY KEY(purl,vendor)
);
"""

SQL_INSERT_MISMATCH = """
INSERT INTO mismatch (purl, vendor)
VALUES (?, ?)
ON CONFLICT DO NOTHING;
"""


def setup_sqlite(data_dir: str, db_file: str) -> bool:
"""
Walk the given data directory, load the mismatch relationships and write to an SQLite file
:param data_dir: data directory to walk
:param db_file: SQLite file to write results into
:return: Success (True) or Failure (False)
"""
if os.path.exists(db_file) and not os.access(db_file, os.W_OK):
LOGGER.error(f"DB file already exists and is not writable: {db_file}")
return False

relation_file_name = "mismatch_relations.yml"
relation_file_paths = []

for root, directories, files in os.walk(data_dir):
if relation_file_name in files:
relation_file_paths.append(os.path.join(root, relation_file_name))

if not relation_file_paths or len(relation_file_paths) == 0:
LOGGER.error(f"No relationship files found in: {data_dir}")
return False

conn = sqlite3.connect(db_file)
cursor = conn.cursor()
cursor.execute(SQL_CREATE_TABLE)

for relation_file_path in relation_file_paths:
if not os.path.exists(relation_file_path):
LOGGER.error(f"Vendor file does not exist, skipping: {relation_file_path}")
continue

with open(relation_file_path) as relation_file:
content = yaml.safe_load(relation_file)
vendor_list = content.get("invalid_vendors", [])
purls_list = content.get("purls", [])
purl_vendor_pairs = list(itertools.product(purls_list, vendor_list))
for purl_vendor_pair in purl_vendor_pairs:
cursor.execute(SQL_INSERT_MISMATCH, purl_vendor_pair)

conn.commit()
conn.close()
return True


def main():
"""
Run the SQLite loader utility
"""
db_path = DISK_LOCATION_DEFAULT / DBNAME
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_dir = os.path.join(parent_dir, "data")
if not setup_sqlite(data_dir, db_path):
exit(1)
exit(0)


if __name__ == "__main__":
main()

0 comments on commit 20781f3

Please sign in to comment.