Skip to content

Commit

Permalink
Address nits.
Browse files Browse the repository at this point in the history
Fix error when inserting no rows.
  • Loading branch information
Ryan Hitchman committed Jul 14, 2017
1 parent bec7b85 commit c6a157a
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 63 deletions.
7 changes: 7 additions & 0 deletions kettle/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,10 @@ KETTLE -- Kubernetes Extract Tests/Transform/Load Engine
This collects test results scattered across a variety of GCS buckets,
stores them in a local SQLite database, and outputs newline-delimited JSON files
for import into BigQuery.

Results are stored in the [k8s-gubernator:build BigQuery dataset](https://bigquery.cloud.google.com/dataset/k8s-gubernator:build),
which is publicly accessible.

Running
=======
Use `pip install -r requirements.txt` to install dependencies.
25 changes: 13 additions & 12 deletions kettle/make_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

"""Generates a SQLite DB containing test data downloaded from GCS."""

# pylint: disable=invalid-name,global-statement

from __future__ import print_function

import argparse
Expand All @@ -37,9 +35,9 @@
import model


def pad_numbers(s):
def pad_numbers(string):
"""Modify a string to make its numbers suitable for natural sorting."""
return re.sub(r'\d+', lambda m: m.group(0).rjust(16, '0'), s)
return re.sub(r'\d+', lambda m: m.group(0).rjust(16, '0'), string)

WORKER_CLIENT = None # used for multiprocessing

Expand Down Expand Up @@ -85,6 +83,8 @@ def get(self, path, as_json=False):

def ls(self, path, dirs=True, files=True, delim=True, item_field='name'):
"""Lists objects under a path on gcs."""
# pylint: disable=invalid-name

bucket, path = self._parse_uri(path)
params = {'prefix': path, 'fields': 'nextPageToken'}
if delim:
Expand Down Expand Up @@ -143,8 +143,9 @@ def _get_builds(self, job):
return False, (str(n) for n in xrange(latest_build, 0, -1))
# Invalid latest-build or bucket is using timestamps
build_paths = self.ls_dirs('%s%s/' % (self.jobs_dir, job))
return True, sorted((os.path.basename(os.path.dirname(b))
for b in build_paths), key=pad_numbers, reverse=True)
return True, sorted(
(os.path.basename(os.path.dirname(b)) for b in build_paths),
key=pad_numbers, reverse=True)

def get_started_finished(self, job, build):
if self.metadata.get('pr'):
Expand Down Expand Up @@ -187,9 +188,9 @@ def mp_init_worker(jobs_dir, metadata, client_class, use_signal=True):

if use_signal:
signal.signal(signal.SIGINT, signal.SIG_IGN)
# Multiprocessing doesn't allow local variables for each worker, so 77 need
# Multiprocessing doesn't allow local variables for each worker, so we need
# to make a GCSClient global variable.
global WORKER_CLIENT
global WORKER_CLIENT # pylint: disable=global-statement
WORKER_CLIENT = client_class(jobs_dir, metadata)

def get_started_finished((job, build)):
Expand Down Expand Up @@ -234,7 +235,7 @@ def get_builds(db, jobs_dir, metadata, threads, client_class):
builds_iterator = pool.imap_unordered(
get_started_finished, jobs_and_builds)
else:
global WORKER_CLIENT
global WORKER_CLIENT # pylint: disable=global-statement
WORKER_CLIENT = gcs
builds_iterator = (
get_started_finished(job_build) for job_build in jobs_and_builds)
Expand Down Expand Up @@ -276,12 +277,12 @@ def download_junit(db, threads, client_class):
builds_to_grab = db.get_builds_missing_junit()
pool = None
if threads > 1:
pool = multiprocessing.pool.ThreadPool(threads, mp_init_worker,
('', {}, client_class, False))
pool = multiprocessing.pool.ThreadPool(
threads, mp_init_worker, ('', {}, client_class, False))
test_iterator = pool.imap_unordered(
get_junits, builds_to_grab)
else:
global WORKER_CLIENT
global WORKER_CLIENT # pylint: disable=global-statement
WORKER_CLIENT = client_class('', {})
test_iterator = (
get_junits(build_path) for build_path in builds_to_grab)
Expand Down
8 changes: 3 additions & 5 deletions kettle/make_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

"""Generate JSON for BigQuery importing."""

# pylint: disable=invalid-name

import argparse
import logging
import json
Expand Down Expand Up @@ -151,10 +149,10 @@ def get_metadata():
metadata.pop('job-version')
if metadata.get('version') == build_version:
metadata.pop('version')
for k, v in metadata.items():
if not isinstance(v, basestring):
for key, value in metadata.items():
if not isinstance(value, basestring):
# the schema specifies a string value. force it!
metadata[k] = json.dumps(v)
metadata[key] = json.dumps(value)
if not metadata:
return None
return [{'key': k, 'value': v} for k, v in sorted(metadata.items())]
Expand Down
5 changes: 3 additions & 2 deletions kettle/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def insert_emitted(self, rows_emitted, incremental_table=DEFAULT_INCREMENTAL_TAB
gen, = self.db.execute('select max(gen)+1 from %s' % incremental_table).fetchone()
if not gen:
gen = 0
self.db.executemany('insert into %s values(?,?)' % incremental_table,
((row, gen) for row in rows_emitted))
self.db.executemany(
'insert into %s values(?,?)' % incremental_table,
((row, gen) for row in rows_emitted))
self.db.commit()
return gen
3 changes: 3 additions & 0 deletions kettle/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
google-cloud-bigquery
google-cloud-pubsub
pyyaml
78 changes: 39 additions & 39 deletions kettle/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

"""Receive push events for new builds and upload rows to BigQuery."""

# pylint: disable=invalid-name

from __future__ import print_function

import argparse
Expand Down Expand Up @@ -71,7 +69,6 @@ def get_started_finished(gcs_client, db, todo):
for ack_id, (build_dir, started, finished) in pool.imap_unordered(
lambda (ack_id, job, build): (ack_id, gcs_client.get_started_finished(job, build)),
todo):
# build_dir, started, finished = gcs_client.get_started_finished(job, build)
if finished:
if not db.insert_build(build_dir, started, finished):
print('already present??')
Expand All @@ -82,7 +79,7 @@ def get_started_finished(gcs_client, db, todo):
build_dirs.append(build_dir)
acks.append(ack_id)
else:
print('???', build_dir, started, finished)
print('finished.json missing?', build_dir, started, finished)
finally:
pool.close()
db.commit()
Expand Down Expand Up @@ -119,6 +116,9 @@ def insert_data(table, rows_iter):
rows.append(row)
row_ids.append(row_id)

if not rows: # nothing to do
return []

def insert(table, rows, row_ids):
"""Insert rows with row_ids into table, retrying as necessary."""
while True:
Expand Down Expand Up @@ -175,11 +175,11 @@ def main(db, sub, tables, client_class=make_db.GCSClient, stop=None):

if acks:
print('ACK irrelevant', len(acks))
for x in xrange(0, len(acks), 1000):
sub.acknowledge(acks[x: x + 1000])
for n in xrange(0, len(acks), 1000):
sub.acknowledge(acks[n: n + 1000])

if todo:
print('EXTEND-ACK ', (len(todo)))
print('EXTEND-ACK ', len(todo))
# give 3 minutes to grab build details
sub.modify_ack_deadline([i for i, _j, _b in todo], 60*3)

Expand All @@ -201,32 +201,6 @@ def main(db, sub, tables, client_class=make_db.GCSClient, stop=None):
db.insert_emitted(emitted, incremental_table)


def get_options(argv):
"""Process command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--poll',
required=True,
help='Follow GCS changes from project/topic/subscription',
)
parser.add_argument(
'--dataset',
help='BigQuery dataset (e.g. k8s-gubernator:build)'
)
parser.add_argument(
'--tables',
nargs='+',
default=[],
help='Upload rows to table:days [e.g. --tables day:1 week:7 all:0]',
)
parser.add_argument(
'--stop_at',
type=int,
help='Terminate when this hour (0-23) rolls around (in local time).'
)
return parser.parse_args(argv)


def load_sub(poll):
"""Return the PubSub subscription specificed by the /-separated input."""
project, topic, subscription = poll.split('/')
Expand All @@ -251,7 +225,11 @@ def make_field(spec):
def load_tables(dataset, tablespecs):
"""Construct a dictionary of BigQuery tables given the input tablespec.
Returns {name: (bigquery.Table, incremental table name)}
Args:
dataset: bigquery.Dataset
tablespecs: list of strings of "NAME:DAYS", e.g. ["day:1"]
Returns:
{name: (bigquery.Table, incremental table name)}
"""
project, dataset_name = dataset.split(':')
dataset = bigquery.Client(project).dataset(dataset_name)
Expand Down Expand Up @@ -285,14 +263,36 @@ def __call__(self):
return now != last and now == self.target


def get_options(argv):
"""Process command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--poll',
required=True,
help='Follow GCS changes from project/topic/subscription',
)
parser.add_argument(
'--dataset',
help='BigQuery dataset (e.g. k8s-gubernator:build)'
)
parser.add_argument(
'--tables',
nargs='+',
default=[],
help='Upload rows to table:days [e.g. --tables day:1 week:7 all:0]',
)
parser.add_argument(
'--stop_at',
type=int,
help='Terminate when this hour (0-23) rolls around (in local time).'
)
return parser.parse_args(argv)


if __name__ == '__main__':
OPTIONS = get_options(sys.argv[1:])

stop_func = None
if OPTIONS.stop_at:
stop_func = StopWhen(OPTIONS.stop_at)

main(model.Database('build.db'),
load_sub(OPTIONS.poll),
load_tables(OPTIONS.dataset, OPTIONS.tables),
stop=stop_func)
stop=StopWhen(OPTIONS.stop_at))
6 changes: 2 additions & 4 deletions kettle/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=missing-docstring,invalid-name
# pylint: disable=missing-docstring

import unittest

Expand Down Expand Up @@ -100,11 +100,10 @@ def test_main(self):

# uncomment if the trace changes
# import pprint; pprint.pprint(fakesub.trace)
# self.maxDiff = 3000

now = make_db_test.MockedClient.NOW

self.maxDiff = 3000

self.assertEqual(
fakesub.trace,
[['pull', False], ['pull', True], ['pull', True],
Expand Down Expand Up @@ -134,7 +133,6 @@ def test_main(self):
['pull', False], ['pull', True],
['modify-ack', ['c'], 180],
['ack', ['c']],
['insert-data', ([], []), {'skip_invalid_rows': True}],
['pull', False], ['pull', True],
['ack', ['d']]])

Expand Down
2 changes: 1 addition & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ disable=fixme,locally-disabled,locally-enabled,relative-import,too-few-public-me
reports=no

[BASIC]
good-names=ls,kw,pr,fp,_,db
good-names=ls,kw,pr,fp,_,db,n

[DESIGN]
max-args=12
Expand Down

0 comments on commit c6a157a

Please sign in to comment.