Skip to content

Commit

Permalink
Merge pull request #580 from HubSpot/logfetch_headers
Browse files Browse the repository at this point in the history
allow addition of custom headers on logfetch requests
  • Loading branch information
tpetr committed Jun 22, 2015
2 parents d48688a + 92d19e3 commit b663660
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 18 deletions.
20 changes: 20 additions & 0 deletions scripts/logfetch/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def fetch():
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
config = ConfigParser.SafeConfigParser()
config.optionxform = str

defaults = {
"num_parallel_fetches" : DEFAULT_PARALLEL_FETCHES,
Expand Down Expand Up @@ -136,6 +137,11 @@ def fetch():
args.end_days = convert_to_date(args, args.end_days)

args.dest = os.path.expanduser(args.dest)
try:
setattr(args, 'headers', dict(config.items("Request Headers")))
except:
sys.stderr.write('No additional request headers found\n')
setattr(args, 'headers', {})

fetch_logs(args)

Expand All @@ -147,6 +153,7 @@ def cat():
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
config = ConfigParser.SafeConfigParser()
config.optionxform = str

defaults = {
"num_parallel_fetches" : DEFAULT_PARALLEL_FETCHES,
Expand Down Expand Up @@ -191,6 +198,12 @@ def cat():
args.end_days = convert_to_date(args, args.end_days)

args.dest = os.path.expanduser(args.dest)
try:
setattr(args, 'headers', dict(config.items("Request Headers")))
except:
sys.stderr.write('No additional request headers found\n')
setattr(args, 'headers', {})


cat_logs(args)

Expand All @@ -202,6 +215,7 @@ def tail():
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
config = ConfigParser.SafeConfigParser()
config.optionxform = str

defaults = {'verbose': False}

Expand Down Expand Up @@ -230,5 +244,11 @@ def tail():
check_args(args)

args.dest = os.path.expanduser(args.dest)
try:
setattr(args, 'headers', dict(config.items("Request Headers")))
except:
sys.stderr.write('No additional request headers found\n')
setattr(args, 'headers', {})


tail_logs(args)
10 changes: 6 additions & 4 deletions scripts/logfetch/live_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def download_live_logs(args):
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size, args.verbose),
params={'path' : '{0}/{1}/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)}
params={'path' : '{0}/{1}/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)},
headers=args.headers
)
)
if logfile_name.endswith('.gz'):
Expand All @@ -45,7 +46,8 @@ def download_live_logs(args):
async_requests.append(
grequests.AsyncRequest('GET',uri ,
callback=generate_callback(uri, args.dest, logfile_name, args.chunk_size, args.verbose),
params={'path' : '{0}/{1}/logs/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)}
params={'path' : '{0}/{1}/logs/{2}'.format(metadata['fullPathToRoot'], metadata['currentDirectory'], log_file)},
headers=args.headers
)
)
if logfile_name.endswith('.gz'):
Expand All @@ -71,11 +73,11 @@ def tasks_to_check(args):

def files_json(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
return get_json_response(uri)
return get_json_response(uri, args)

def logs_folder_files(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
files_json = get_json_response(uri, {'path' : '{0}/logs'.format(task)})
files_json = get_json_response(uri, args, {'path' : '{0}/logs'.format(task)})
if 'files' in files_json:
files = files_json['files']
return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])]
Expand Down
11 changes: 7 additions & 4 deletions scripts/logfetch/logfetch_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
import gzip
import fnmatch
from datetime import datetime
from datetime import datetime, timedelta
from termcolor import colored
from singularity_request import get_json_response

Expand Down Expand Up @@ -54,17 +54,20 @@ def tasks_for_requests(args):
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args, request)]
tasks = tasks[0:args.task_count] if hasattr(args, 'task_count') else tasks
all_tasks = all_tasks + tasks
if not all_tasks:
sys.stderr.write(colored('No tasks found, check that the request/task you are searching for exists...', 'red'))
exit(1)
return all_tasks

def log_matches(inputString, pattern):
return fnmatch.fnmatch(inputString, pattern) or fnmatch.fnmatch(inputString, pattern + '*.gz')

def all_tasks_for_request(args, request):
uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(request))
active_tasks = get_json_response(uri)
active_tasks = get_json_response(uri, args)
if hasattr(args, 'start_days'):
uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(request))
historical_tasks = get_json_response(uri)
historical_tasks = get_json_response(uri, args)
if len(historical_tasks) == 0:
return active_tasks
elif len(active_tasks) == 0:
Expand All @@ -76,7 +79,7 @@ def all_tasks_for_request(args, request):

def all_requests(args):
uri = '{0}{1}'.format(base_uri(args), ALL_REQUESTS)
requests = get_json_response(uri)
requests = get_json_response(uri, args)
included_requests = []
for request in requests:
if fnmatch.fnmatch(request['request']['id'], args.requestId):
Expand Down
8 changes: 4 additions & 4 deletions scripts/logfetch/s3_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def download_s3_logs(args):
if not args.logtype or log_matches(args, filename):
if not already_downloaded(args.dest, filename):
async_requests.append(
grequests.AsyncRequest('GET', log_file['getUrl'], callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size, args.verbose))
grequests.AsyncRequest('GET', log_file['getUrl'], callback=generate_callback(log_file['getUrl'], args.dest, filename, args.chunk_size, args.verbose), headers=args.headers)
)
else:
if args.verbose:
Expand Down Expand Up @@ -52,16 +52,16 @@ def already_downloaded(dest, filename):

def logs_for_all_requests(args):
if args.taskId:
return get_json_response(s3_task_logs_uri(args, args.taskId))
return get_json_response(s3_task_logs_uri(args, args.taskId), args)
else:
tasks = logfetch_base.tasks_for_requests(args)
logs = []
for task in tasks:
s3_logs = get_json_response(s3_task_logs_uri(args, task))
s3_logs = get_json_response(s3_task_logs_uri(args, task), args)
logs = logs + s3_logs if s3_logs else logs
sys.stderr.write(colored('Also searching s3 history...\n', 'cyan'))
for request in logfetch_base.all_requests(args):
s3_logs = get_json_response(s3_request_logs_uri(args, request))
s3_logs = get_json_response(s3_request_logs_uri(args, request), args)
logs = logs + s3_logs if s3_logs else logs
return [dict(t) for t in set([tuple(l.items()) for l in logs])] # remove any duplicates

Expand Down
4 changes: 2 additions & 2 deletions scripts/logfetch/singularity_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

ERROR_STATUS_FORMAT = 'Singularity responded with an invalid status code ({0})'

def get_json_response(uri, params={}):
singularity_response = requests.get(uri, params=params)
def get_json_response(uri, args, params={}):
singularity_response = requests.get(uri, params=params, headers=args.headers)
if singularity_response.status_code < 199 or singularity_response.status_code > 299:
sys.stderr.write('{0} params:{1}\n'.format(uri, str(params)))
sys.stderr.write(colored(ERROR_STATUS_FORMAT.format(singularity_response.status_code), 'red') + '\n')
Expand Down
8 changes: 4 additions & 4 deletions scripts/logfetch/tail.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def stream_log_for_task(self, args, task):
path = '{0}/{1}'.format(task, args.logfile)
keep_trying = True
try:
offset = self.get_initial_offset(uri, path)
offset = self.get_initial_offset(uri, path, args)
except ValueError:
sys.stderr.write(colored('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task), 'red'))
keep_trying = False
Expand All @@ -63,16 +63,16 @@ def stream_log_for_task(self, args, task):
sys.stderr.write(colored('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task), 'red'))
keep_trying = False

def get_initial_offset(self, uri, path):
def get_initial_offset(self, uri, path, args):
params = {"path" : path}
return long(requests.get(uri, params=params).json()['offset'])
return long(requests.get(uri, params=params, headers=args.headers).json()['offset'])

def fetch_new_log_data(self, uri, path, offset, args, task):
params = {
"path" : path,
"offset" : offset
}
response = requests.get(uri, params=params).json()
response = requests.get(uri, params=params, headers=args.headers).json()
prefix = '({0}) =>\n'.format(task) if args.verbose else ''
if len(response['data'].encode('utf-8')) > 0:
if args.grep:
Expand Down

0 comments on commit b663660

Please sign in to comment.