Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parsing package repo url/info/data #97

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 103 additions & 56 deletions packj/audit/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,40 +369,58 @@ def analyze_homepage(pm_proxy, pkg_name, ver_str, pkg_info, risks, report):
finally:
return risks, report

def analyze_repo_descr(risks, report):
def analyze_repo_descr(repo_info, reason, risks, report):
try:
descr = None
msg_info('Checking repo description...', end='', flush=True, indent=1)
descr = report['repo'].get('description', None)
if not descr:
msg_warn('WARN', 'No descr')
if not repo_info:
logging.debug(f'Failed to fetch repo description: {reason}')
alert_type = 'invalid or no source repo'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
if reason == 'parse error':
msg_fail(reason)
else:
msg_alert(reason)
else:
msg_ok(descr)
descr = repo_info.get('description', None)
if not descr:
msg_warn('WARN', 'No descr')
else:
msg_ok(descr)
except Exception as e:
msg_fail(str(e))
logging.debug(f'Failed to parse repo description: {str(e)}')
msg_fail('parse error')
finally:
return risks, report
return risks, report, descr, reason

def analyze_repo_data(config, risks, report):
def analyze_repo_info(repo_url, reason, config, risks, report):
try:
repo_url = report['repo']['url']
msg_info('Checking repo data...', end='', flush=True, indent=1)
err, repo_data = fetch_repo_data(config, repo_url)
assert repo_data, err

try:
num_forks = repo_data['num_forks']
except KeyError:
num_forks = None
msg_info('Fetching repo data...', end='', flush=True, indent=1)
repo_info = None
if repo_url:
reason, repo_info = fetch_repo_data(config, repo_url)
if not repo_info:
logging.debug(f'Failed to fetch repo info: {reason}')
alert_type = 'invalid or no source repo'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
return risks, report, None, reason
except Exception as e:
logging.debug(f'Failed to fetch repo info: {str(e)}')
reason = 'fetch error'
msg_fail(reason)
return risks, report, None, reason

try:
num_stars = repo_data['num_stars']
except KeyError:
num_stars = None
# update report
try:
report['repo'].update(repo_info)
except Exception as e:
logging.debug(f'Failed to update report with repo info: {str(e)}')

try:
forked_from = repo_data['forked_from']
except KeyError:
forked_from = None
# parse info: forks, stars, etc.
try:
num_forks = repo_info.get('num_forks', None)
num_stars = repo_info.get('num_stars', None)

msg = ''
alert = False
Expand Down Expand Up @@ -430,15 +448,13 @@ def analyze_repo_data(config, risks, report):
else:
msg_ok(msg)

report['repo'].update(repo_data)
except Exception as e:
msg_fail(str(e))

if not repo_data:
return risks, report
logging.debug(f'Failed to parse repo info (forks/stars/etc.): {str(e)}')
msg_fail('parse error')

try:
msg_info('Checking if repo is a forked copy...', end='', flush=True, indent=1)
forked_from = repo_info.get('forked_from', None)
if forked_from:
alert_type = 'source repo is a forked copy'
reason = f'forked from {forked_from}'
Expand All @@ -447,27 +463,39 @@ def analyze_repo_data(config, risks, report):
else:
msg_ok('original, not forked')
except Exception as e:
msg_fail(str(e))
logging.debug(f'Failed to parse repo forked info: {str(e)}')
msg_fail('parse error')
finally:
return risks, report
return risks, report, repo_info, None

def analyze_repo_activity(risks, report):
try:
repo_url = report['repo']['url']
msg_info('Checking repo activity...', end='', flush=True, indent=1)
repo_url = report['repo']['url']
reason, repo_data = git_clone(repo_url)
except KeyError:
reason = 'no repo url info'
logging.debug(f'Failed to clone repo: {reason}')
except Exception as e:
logging.debug(f'Failed to clone repo {repo_url}: {str(e)}')
reason = 'git clone error'

# parse repo metadata
try:
if reason:
alert_type = 'invalid or no source repo'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
elif repo_data:
commits, contributors, tags = tuple(len(repo_data[k]) if repo_data[k] else None for k in ('commits', 'contributors', 'tags'))
msg_ok(f'commits: {commits}, contributors: {contributors}, tags: {tags}')
report['repo'].update(repo_data)
report['repo'].update(repo_data)
except Exception as e:
msg_fail(str(e))
logging.debug(f'Failed to parse commits/contributors for repo {repo_url}: {str(e)}')
reason = 'parsing error'
msg_fail(reason)
finally:
return risks, report, repo_data
return risks, report, repo_data, reason

def get_pkg_ver_release_dates_before_after(release_history, pkg_ver):
pkg_release_date = release_history[pkg_ver]['release_date']
Expand Down Expand Up @@ -509,13 +537,18 @@ def get_repo_ver_release_dates(tag_list, cutoff_datetime):
except Exception as e:
raise Exception(f'Failed to find repo release dates: {str(e)}')

def analyze_repo_releases(repo_data, risks, report, release_history):
def analyze_repo_releases(repo_data, reason, risks, report, release_history):
try:
msg_info('Analyzing repo-pkg release match...', end='', flush=True, indent=1)
release_tags = repo_data['tags']
release_tags = None
if repo_data:
logging.debug(f'Failed to analyze repo-pkg release match: {reason}')
release_tags = repo_data.get('tags', None)

if not release_tags or not len(release_tags):
alert_type = 'inconsistent with repo source'
reason = 'no repo releases'
if not reason:
reason = 'no repo releases'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
Expand All @@ -537,19 +570,27 @@ def analyze_repo_releases(repo_data, risks, report, release_history):
else:
msg_ok(f'matching tag(s) {",".join(repo_ver_tag)} on {repo_ver_release_date}')
except Exception as e:
msg_fail(str(e))
logging.debug(f'Failed to parse release tags for repo {repo_data}: {str(e)}')
reason = 'parse error'
msg_fail(reason)
finally:
return risks, report
return risks, report, reason

def analyze_repo_code(repo_data, risks, report):
def analyze_repo_code(repo_data, reason, risks, report):
try:
msg_info('Analyzing repo-pkg src code match...', end='', flush=True, indent=1)
# TODO
msg_warn(' N/A','Coming soon!')
if not repo_data:
logging.debug(f'Failed to analyze repo-pkg src code match: {reason}')
alert_type = 'inconsistent with repo source'
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
# TODO
msg_warn(' N/A','Coming soon!')
except Exception as e:
msg_fail(str(e))
finally:
return risks, report
return risks, report, reason

def analyze_repo_url(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report):
try:
Expand Down Expand Up @@ -592,14 +633,22 @@ def analyze_repo_url(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, rep
risks = alert_user(alert_type, THREAT_MODEL, reason, risks)
msg_alert(reason)
else:
reason = None
msg_ok(repo_url)
except Exception as e:
logging.debug(f'Failed to get repo_url: {str(e)}')
reason = 'parse error'
msg_fail(reason)

# update report
try:
report['repo'] = {
'url' : repo_url,
}
except Exception as e:
msg_fail(str(e))
finally:
return risks, report
logging.debug(f'Failed to update report with repo_url: {str(e)}')

return risks, report, repo_url, reason

def analyze_readme(pm_proxy, pkg_name, ver_str, pkg_info, risks, report):
try:
Expand Down Expand Up @@ -1023,14 +1072,12 @@ def audit(pm_args, pkg_name, ver_str, report_dir, extra_args, config):
risks, report = analyze_install_hooks(pm_proxy, pkg_name, pkg_info, risks, report)
risks, report = analyze_typosquatting(pm_proxy, pkg_name, pkg_info, risks, report)
risks, report = analyze_dep_confusion(pm_proxy, pkg_name, pkg_info, risks, report)
risks, report = analyze_repo_url(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report)
if 'repo' in report and 'url' in report['repo'] and report['repo']['url']:
risks, report = analyze_repo_data(config, risks, report)
if 'description' in report['repo']:
risks, report = analyze_repo_descr(risks, report)
risks, report, repo_data = analyze_repo_activity(risks, report)
risks, report = analyze_repo_releases(repo_data, risks, report, release_history)
risks, report = analyze_repo_code(repo_data, risks, report)
risks, report, repo_url, reason = analyze_repo_url(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report)
risks, report, repo_info, reason = analyze_repo_info(repo_url, reason, config, risks, report)
risks, report, _, _ = analyze_repo_descr(repo_info, reason, risks, report)
risks, report, repo_data, reason = analyze_repo_activity(risks, report)
risks, report, _ = analyze_repo_releases(repo_data, reason, risks, report, release_history)
risks, report, _ = analyze_repo_code(repo_data, reason, risks, report)
risks, report = analyze_cves(pm_name, pkg_name, ver_str, risks, report)
risks, report = analyze_deps(pm_proxy, pkg_name, ver_str, pkg_info, ver_info, risks, report)

Expand Down
2 changes: 1 addition & 1 deletion packj/util/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def git_clone(repo_url):
git.Git(clone_dir).clone(repo_url)
except Exception as e:
logging.debug("Failed to clone %s: %s" % (repo_url, str(e)))
return "repo does not exit", None
return "repo does not exist", None

try:
clone_dir = os.path.join(clone_dir, os.path.basename(repo_url))
Expand Down
Loading