diff --git a/libchickadee/backends/__init__.py b/libchickadee/backends/__init__.py index 442b079..0978f28 100644 --- a/libchickadee/backends/__init__.py +++ b/libchickadee/backends/__init__.py @@ -1,6 +1,5 @@ """Base class for all backends.""" import json -import time import csv __author__ = 'Chapin Bryce' @@ -8,13 +7,14 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class ResolverBase(object): """Generic base class for use by other backends.""" def __init__(self, fields=list(), lang='en'): self.uri = None self.lang = lang self.supported_langs = [] - self.fields = fields # Ordered list of fields to gather + self.fields = fields # Ordered list of fields to gather self.pbar = False # Enable progress bars self.data = None @@ -100,7 +100,6 @@ def write_json(outfile, data, headers=None, lines=False): selected_data.append(d) data = selected_data - if lines: for entry in data: open_file.write(json.dumps(entry)+"\n") diff --git a/libchickadee/backends/ipapi.py b/libchickadee/backends/ipapi.py index 255f482..22bc2d5 100644 --- a/libchickadee/backends/ipapi.py +++ b/libchickadee/backends/ipapi.py @@ -15,7 +15,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' -FIELDS = [ # Ordered list of fields to gather +FIELDS = [ # Ordered list of fields to gather 'query', 'as', 'org', 'isp' 'continent', 'country', 'regionName', 'city', @@ -25,6 +25,7 @@ 'status', 'message' ] + class Resolver(ResolverBase): """Class to handle ip-api.com API queries for IP addresses.""" def __init__(self, fields=FIELDS, lang='en'): @@ -54,7 +55,7 @@ def sleeper(self): if wait_time.total_seconds() < 0: self.wait_time = datetime.now() return - wt_sec = wait_time.total_seconds()+1 # add a buffer + wt_sec = wait_time.total_seconds()+1 # add a buffer logger.info( 'Sleeping for {} seconds due to rate limiting.'.format(wt_sec)) time.sleep(wt_sec) @@ -72,7 +73,6 @@ def batch(self): else: orig_recs = range(0, len(records), 100) - for x in orig_recs: params = { 'fields': ','.join(self.fields), @@ -137,6 +137,7 @@ def single(self): logger.error(msg) return [{'query': self.data, 'status': 'failed', 'message': msg}] + class ProResolver(Resolver): """GeoIP resolver using the ip-api.com paid subscription.""" def __init__(self, api_key, fields=FIELDS, lang='en'): diff --git a/libchickadee/chickadee.py b/libchickadee/chickadee.py index be09917..e0c9db4 100644 --- a/libchickadee/chickadee.py +++ b/libchickadee/chickadee.py @@ -35,7 +35,7 @@ ''' logger = logging.getLogger(__name__) -_FIELDS = ','.join([ # Ordered list of fields to gather +_FIELDS = ','.join([ # Ordered list of fields to gather 'query', 'count', 'as', 'org', 'isp', 'continent', 'country', 'regionName', 'city', 'district', 'zip', @@ -44,6 +44,7 @@ 'status', 'message' ]) + class Chickadee(object): """Class to handle chickadee script operations.""" def __init__(self, outformat='json', outfile=sys.stdout, fields=_FIELDS): @@ -70,22 +71,25 @@ def run(self, input_data): results = [] result_dict = {} # Extract and resolve IP addresses - if not isinstance(self.input_data, _io.TextIOWrapper) and os.path.isdir(self.input_data): + if not isinstance(self.input_data, _io.TextIOWrapper) and \ + os.path.isdir(self.input_data): logger.debug("Detected the data source as a directory") - result_dict = self.dir_handler(self.input_data) # Directory handler - elif isinstance(self.input_data, _io.TextIOWrapper) or os.path.isfile(self.input_data): + result_dict = self.dir_handler(self.input_data) # Dir handler + elif isinstance(self.input_data, _io.TextIOWrapper) or \ + os.path.isfile(self.input_data): logger.debug("Detected the data source as a file") - result_dict = self.file_handler(self.input_data) # File handler + result_dict = self.file_handler(self.input_data) # File handler elif isinstance(self.input_data, str): logger.debug("Detected the data source as raw value(s)") - result_dict = self.str_handler(self.input_data) # String handler + result_dict = self.str_handler(self.input_data) # String handler # Resolve if requested if self.resolve_ips: results = self.resolve(result_dict) return results - return [{'query': k, 'count': v, 'message': 'No resolve'} for k, v in result_dict.items()] + return [{'query': k, 'count': v, 'message': 'No resolve'} + for k, v in result_dict.items()] def write_output(self, results): """Write results to output format and/or files @@ -135,7 +139,7 @@ def str_handler(data): # Generate a distinct list with count data_dict = {} for x in raw_data: - if not x in data_dict: + if x not in data_dict: data_dict[x] = 0 data_dict[x] += 1 return data_dict @@ -153,7 +157,8 @@ def resolve(self, data_dict): """ distinct_ips = list(data_dict.keys()) - logger.info("Identified {} distinct IPs for resolution".format(len(distinct_ips))) + logger.info("Identified {} distinct IPs for resolution".format( + len(distinct_ips))) api_key = self.get_api_key() @@ -170,7 +175,8 @@ def resolve(self, data_dict): results = [] data = distinct_ips if self.pbar: - data = tqdm(distinct_ips, desc="Resolving IPs", unit_scale=True) + data = tqdm(distinct_ips, desc="Resolving IPs", + unit_scale=True) for element in data: resolver.data = element @@ -184,10 +190,7 @@ def resolve(self, data_dict): if 'count' in self.fields: updated_results = [] for result in results: - try: - query = str(result.get('query', '')) - except AttributeError: - import pdb; pdb.set_trace() + query = str(result.get('query', '')) result['count'] = int(data_dict.get(query, '0')) updated_results.append(result) @@ -224,7 +227,6 @@ def file_handler(file_path): logger.warning("Failed to parse {}".format(file_path)) return file_parser.ips - def dir_handler(self, file_path): """Handle parsing IP addresses from files recursively @@ -244,9 +246,11 @@ def dir_handler(self, file_path): logger.debug("Parsed file {}, {} results".format( file_entry, len(file_results))) result_dict = dict(Counter(result_dict)+Counter(file_results)) - logger.debug("{} total distinct IPs discovered".format(len(result_dict))) + logger.debug("{} total distinct IPs discovered".format( + len(result_dict))) return result_dict + def setup_logging(path, verbose=False): """Function to setup logging configuration and test it.""" # Allow us to modify the `logger` variable within a function @@ -282,9 +286,11 @@ def setup_logging(path, verbose=False): logger.addHandler(file_handle) -class CustomArgFormatter(argparse.RawTextHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): +class CustomArgFormatter(argparse.RawTextHelpFormatter, + argparse.ArgumentDefaultsHelpFormatter): """Custom argparse formatter class""" + def arg_handling(): """Argument handling.""" parser = argparse.ArgumentParser( @@ -365,7 +371,6 @@ def entry(args=None): else: data = chickadee.run(args.data) - logger.info("Writing output") chickadee.outfile = args.output_file chickadee.outformat = args.output_format @@ -373,5 +378,6 @@ def entry(args=None): logger.info("Chickadee complete") + if __name__ == "__main__": entry() diff --git a/libchickadee/parsers/__init__.py b/libchickadee/parsers/__init__.py index 0a9e424..7fb5d0b 100644 --- a/libchickadee/parsers/__init__.py +++ b/libchickadee/parsers/__init__.py @@ -48,6 +48,7 @@ IPv4Pattern = re.compile(IPV4ADDR) IPv6Pattern = re.compile(IPV6ADDR) + def strip_ipv6(ipv6_addr): """Isolate IPv6 Value""" if '%' in ipv6_addr: diff --git a/libchickadee/parsers/plain_text.py b/libchickadee/parsers/plain_text.py index 00289bd..dcefd42 100644 --- a/libchickadee/parsers/plain_text.py +++ b/libchickadee/parsers/plain_text.py @@ -1,8 +1,8 @@ """Parse IP addresses from plain text files and feed to the Chickadee GeoIP API -Plain text files include logs, csvs, json, and other formats where ascii strings -contain IPv4 or IPv6 addresses. +Plain text files include logs, csvs, json, and other formats where ascii +strings contain IPv4 or IPv6 addresses. """ import binascii @@ -16,6 +16,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class PlainTextParser(object): """Class to extract IP addresses from plain text and gzipped plain text files.""" @@ -55,6 +56,7 @@ def parse_file(self, file_entry, is_stream=False): if 'closed' in dir(file_data) and not file_data.closed: file_data.close() + if __name__ == "__main__": # pragma: no cover import argparse parser = argparse.ArgumentParser() diff --git a/libchickadee/parsers/xlsx.py b/libchickadee/parsers/xlsx.py index 2d8a0dd..e17c916 100644 --- a/libchickadee/parsers/xlsx.py +++ b/libchickadee/parsers/xlsx.py @@ -14,6 +14,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class XLSXParser(object): """Class to extract IP addresses from xlsx workbooks.""" def __init__(self): @@ -41,6 +42,7 @@ def check_ips(self, data): self.ips[strip_ipv6(ipv6)] = 0 self.ips[strip_ipv6(ipv6)] += 1 + if __name__ == '__main__': # pragma: no cover import argparse parser = argparse.ArgumentParser() diff --git a/libchickadee/test/test_backend_ipapi.py b/libchickadee/test/test_backend_ipapi.py index cff8b01..5e0bc0c 100644 --- a/libchickadee/test/test_backend_ipapi.py +++ b/libchickadee/test/test_backend_ipapi.py @@ -8,6 +8,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class IPAPITestCase(unittest.TestCase): """IP-API Backend Tests.""" def setUp(self): @@ -25,7 +26,8 @@ def setUp(self): 'org': 'Google LLC', 'proxy': False, 'query': '2001:4860:4860::8888'} ] - self.resolver = Resolver(fields=['query', 'count', 'as', 'country', 'org', 'proxy']) + self.resolver = Resolver(fields=['query', 'count', 'as', + 'country', 'org', 'proxy']) def test_ipapi_resolve_query_single(self): """Query Method Test""" @@ -37,7 +39,7 @@ def test_ipapi_resolve_query_batch(self): """Batch Query Method Test""" data = self.resolver.query(self.test_data_ips) res = [x for x in data] - batch_result = [] # No reverse field + batch_result = [] # No reverse field for item in self.expected_result: if 'reverse' in item: item.pop('reverse') @@ -72,6 +74,7 @@ def test_ipapi_resolve_single_field(self): expected[field] = self.expected_result[count].get(field, None) self.assertEqual(data, expected) + ''' import os from libchickadee.backends.ipapi import ProResolver @@ -152,5 +155,6 @@ def test_ipapi_resolve_batch(self): self.assertCountEqual(res, batch_result) # ''' + if __name__ == '__main__': unittest.main() diff --git a/libchickadee/test/test_chickadee.py b/libchickadee/test/test_chickadee.py index 33a272b..1dd3724 100644 --- a/libchickadee/test/test_chickadee.py +++ b/libchickadee/test/test_chickadee.py @@ -9,6 +9,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class ChickadeeStringTestCase(unittest.TestCase): """Chickadee script tests.""" def setUp(self): @@ -33,7 +34,8 @@ def test_no_resolve(self): results = [ {'query': '10.0.1.2', 'count': 1, 'message': 'No resolve'}, {'query': '8.8.8.8', 'count': 1, 'message': 'No resolve'}, - {'query': '2001:4860:4860::8888', 'count': 1, 'message': 'No resolve'} + {'query': '2001:4860:4860::8888', 'count': 1, + 'message': 'No resolve'} ] for count, ip in enumerate(self.test_data_ips): chickadee = Chickadee() @@ -144,7 +146,7 @@ def test_ipapi_resolve_query_txt_file(self): chickadee.fields = self.fields data = chickadee.run(os.path.join(self.test_data_dir, 'txt_ips.txt')) res = [x for x in data] - batch_result = [] # No reverse field + batch_result = [] # No reverse field for item in self.txt_data_results: if 'reverse' in item: item.pop('reverse') @@ -155,9 +157,10 @@ def test_ipapi_resolve_query_gz_file(self): """Batch Query Method Test""" chickadee = Chickadee() chickadee.fields = self.fields - data = chickadee.run(os.path.join(self.test_data_dir, 'txt_ips.txt.gz')) + data = chickadee.run(os.path.join(self.test_data_dir, + 'txt_ips.txt.gz')) res = [x for x in data] - batch_result = [] # No reverse field + batch_result = [] # No reverse field for item in self.txt_data_results: if 'reverse' in item: item.pop('reverse') @@ -170,7 +173,7 @@ def test_ipapi_resolve_query_xlsx_file(self): chickadee.fields = self.fields data = chickadee.run(os.path.join(self.test_data_dir, 'test_ips.xlsx')) res = [x for x in data] - batch_result = [] # No reverse field + batch_result = [] # No reverse field for item in self.xlsx_data_results: if 'reverse' in item: item.pop('reverse') @@ -180,31 +183,38 @@ def test_ipapi_resolve_query_xlsx_file(self): def test_ipapi_resolve_query_folder(self): """Batch Query Method Test""" expected = [ - {"country": "Australia", "org": "", "as": "AS13335 Cloudflare, Inc.", + {"country": "Australia", "org": "", + "as": "AS13335 Cloudflare, Inc.", "proxy": False, "query": "1.1.1.1", "count": 6}, {"query": "10.0.1.2", "count": 3}, - {"country": "United States", "org": "Level 3", "as": "AS15169 Google LLC", + {"country": "United States", "org": "Level 3", + "as": "AS15169 Google LLC", "proxy": False, "query": "8.8.8.8", "count": 3}, - {"country": "United States", "org": "Google LLC", "as": "AS15169 Google LLC", + {"country": "United States", "org": "Google LLC", + "as": "AS15169 Google LLC", "proxy": False, "query": "2001:4860:4860::8888", "count": 3}, {"country": "United States", "org": "Informs", - "as": "AS3356 Level 3 Communications, Inc.", "proxy": True, "query": "4.4.4.4", + "as": "AS3356 Level 3 Communications, Inc.", "proxy": True, + "query": "4.4.4.4", "count": 3}, - {"country": "United States", "org": "Google LLC", "as": "AS15169 Google LLC", + {"country": "United States", "org": "Google LLC", + "as": "AS15169 Google LLC", "proxy": False, "query": "2001:4860:4860::8844", "count": 4}, - {"country": "United States", "org": "Google LLC", "as": "AS15169 Google LLC", + {"country": "United States", "org": "Google LLC", + "as": "AS15169 Google LLC", "proxy": False, "query": "2001:4860:4860::8888", "count": 3}, {"country": "France", "org": "", "as": "AS3215 Orange S.A.", "proxy": True, "query": "2.2.2.2", "count": 3}, - {"country": "United States", "org": "Google LLC", "as": "AS15169 Google LLC", + {"country": "United States", "org": "Google LLC", + "as": "AS15169 Google LLC", "proxy": False, "query": "2001:4860:4860::8844", "count": 4} ] @@ -214,5 +224,6 @@ def test_ipapi_resolve_query_folder(self): res = [x for x in data] self.assertCountEqual(res, expected) + if __name__ == '__main__': unittest.main() diff --git a/libchickadee/test/test_parser_plaintext.py b/libchickadee/test/test_parser_plaintext.py index 1f132c4..d9b826d 100644 --- a/libchickadee/test/test_parser_plaintext.py +++ b/libchickadee/test/test_parser_plaintext.py @@ -9,6 +9,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class PlainTextParserTestCase(unittest.TestCase): """Plain-text parsing tests""" def setUp(self): @@ -50,5 +51,6 @@ def test_gz_txt_detection(self): self.parser.is_gz_file(self.test_data_dir+'/txt_ips.txt') ) + if __name__ == '__main__': unittest.main() diff --git a/libchickadee/test/test_parser_xlsx.py b/libchickadee/test/test_parser_xlsx.py index 691a908..6f43f00 100644 --- a/libchickadee/test/test_parser_xlsx.py +++ b/libchickadee/test/test_parser_xlsx.py @@ -9,6 +9,7 @@ __license__ = 'GPLv3 Copyright 2019 Chapin Bryce' __desc__ = '''Yet another GeoIP resolution tool.''' + class XLSXParserTestCase(unittest.TestCase): """XLSX parsing tests.""" def setUp(self): @@ -31,5 +32,6 @@ def test_ip_extraction_xlsx(self): self.parser.parse_file(self.test_data_dir+'/test_ips.xlsx') self.assertEqual(self.test_data_ips, self.parser.ips) + if __name__ == '__main__': unittest.main() diff --git a/push_checks.sh b/push_checks.sh index 7351219..74a91ac 100755 --- a/push_checks.sh +++ b/push_checks.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash fgrep -Ri pdb libchickadee/**/*.py -pylint libchickadee +flake8 libchickadee --count --show-source --statistics coverage run tests.py && coverage html open htmlcov/index.html \ No newline at end of file