From e366c725c463b276155857320d2805da331bdf2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Fri, 25 Jul 2014 14:49:05 +0200 Subject: [PATCH 01/12] FEATURE: Add AWS Multi-Object Delete --- s3funnel/__init__.py | 44 +++++++++++++++++++++- s3funnel/jobs.py | 89 ++++++++++++++++++++++++++++++++++---------- scripts/s3funnel | 26 +++++++++---- 3 files changed, 130 insertions(+), 29 deletions(-) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index c34d601..71bfb51 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -10,9 +10,9 @@ from Queue import Queue, Empty from exceptions import FunnelError -from jobs import GetJob, PutJob, DeleteJob, CopyJob +from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob -__all__ = ['GetJob','PutJob','DeleteJob','S3ToolBox','BucketFunnel'] +__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel'] # Helpers @@ -192,6 +192,46 @@ def delete(self, bucket, ikeys, retry=5, **config): return collapse_queue(failed) + def batch_delete(self, bucket, ikeys, retry=5, **config): + """ + Given an iterator of key names, delete these keys from the current bucket. + Return a list of failed keys (if any). + """ + + def grouper(iterable, n): + iterable = iter(iterable) + i = n + group = [] + while True: + try: + item = iterable.next() + if item: + group.append(item) + i -= 1 + if i == 0: + yield group + group = [] + i = n + except StopIteration: + if group: + yield group + break + + # Setup local config for this request + c = {} + c.update(config) + c['retry'] = retry + + failed = Queue() + pool = self._get_pool() + batch_size = c['batch_delete'] + for k in grouper(ikeys, batch_size): + j = DeleteMulitpleJob(bucket, k, failed, c) + pool.put(j) + pool.join() + + return collapse_queue(failed) + def get(self, bucket, ikeys, retry=5, **config): """ Given an iterator of key names, download these files from the current bucket. diff --git a/s3funnel/jobs.py b/s3funnel/jobs.py index 6c42673..52867c2 100644 --- a/s3funnel/jobs.py +++ b/s3funnel/jobs.py @@ -4,9 +4,11 @@ # This module is part of s3funnel and is released under # the MIT license: http://www.opensource.org/licenses/mit-license.php -from workerpool import Job -import boto import time + +from workerpool import Job + + try: import hashlib except ImportError: @@ -14,6 +16,7 @@ import os import logging + log = logging.getLogger(__name__) READ_CHUNK = 8192 @@ -22,34 +25,36 @@ from boto.exception import BotoServerError, BotoClientError, S3ResponseError from httplib import IncompleteRead from socket import error as SocketError -from s3funnel import FunnelError + class JobError(Exception): pass + # Jobs class GetJob(Job): "Download the given key from S3." + def __init__(self, bucket, key, failed, config={}): self.bucket = bucket self.key = key self.failed = failed self.retries = config.get('retry', 5) - self.ignore_s3fs_dirs = config.get('ignore_s3fs_dirs',True) + self.ignore_s3fs_dirs = config.get('ignore_s3fs_dirs', True) def _do(self, toolbox): for i in xrange(self.retries): try: b = toolbox.get_bucket(self.bucket) - + b.connection.provider.metadata_prefix = '' k = b.get_key(self.key) m = k.get_metadata('content-type') - + if m == 'application/x-directory' and self.ignore_s3fs_dirs: - log.warn("Skipping s3fs directory: %s" % self.key) - return + log.warn("Skipping s3fs directory: %s" % self.key) + return try: # Create directories in case key has "/" if os.path.dirname(self.key) and not os.path.exists(os.path.dirname(self.key)): @@ -71,7 +76,7 @@ def _do(self, toolbox): break except (IncompleteRead, SocketError, BotoClientError), e: log.warning("Caught exception: %r.\nRetrying..." % e) - time.sleep((2 ** i) / 4.0) # Exponential backoff + time.sleep((2 ** i) / 4.0) # Exponential backoff except IOError, e: log.error("%s: '%s'" % (e.strerror, e.filename)) return @@ -83,13 +88,15 @@ def run(self, toolbox): try: self._do(toolbox) except JobError, e: - os.unlink(self.key) # Remove file since download failed + os.unlink(self.key) # Remove file since download failed self.failed.put(self.key) except Exception, e: self.failed.put(e) + class PutJob(Job): "Upload the given file to S3, where the key corresponds to basename(path)" + def __init__(self, bucket, path, failed, config={}): self.bucket = bucket self.path = path @@ -99,8 +106,8 @@ def __init__(self, bucket, path, failed, config={}): self.key = "%s%s" % (self.add_prefix, self.path) # --del-prefix logic self.del_prefix = config.get('del_prefix') - if self.del_prefix and self.key.startswith(self.del_prefix): - self.key = self.key.replace(self.del_prefix, '', 1) + if self.del_prefix and self.key.startswith(self.del_prefix): + self.key = self.key.replace(self.del_prefix, '', 1) if not config.get('put_full_path'): self.key = os.path.basename(self.key) self.retries = config.get('retry', 5) @@ -147,7 +154,7 @@ def _do(self, toolbox): toolbox.reset() except (IncompleteRead, SocketError, BotoClientError), e: log.warning("Caught exception: %r.\nRetrying..." % e) - time.sleep((2 ** i) / 4.0) # Exponential backoff + time.sleep((2 ** i) / 4.0) # Exponential backoff except IOError, e: log.warning("Path does not exist, skipping: %s" % self.path) break @@ -165,8 +172,10 @@ def run(self, toolbox): except Exception, e: self.failed.put(e) + class DeleteJob(Job): "Delete the given key from S3." + def __init__(self, bucket, key, failed, config={}): self.bucket = bucket self.key = key @@ -186,7 +195,7 @@ def _do(self, toolbox): break except (IncompleteRead, SocketError, BotoClientError), e: log.warning("Caught exception: %r.\nRetrying..." % e) - time.sleep((2 ** i) / 4.0) # Exponential backoff + time.sleep((2 ** i) / 4.0) # Exponential backoff log.error("Failed to delete: %s" % self.key) @@ -197,9 +206,51 @@ def run(self, toolbox): self.failed.put(self.key) except Exception, e: self.failed.put(e) - + + +class DeleteMulitpleJob(Job): + "Delete the given keys from S3." + + def __init__(self, bucket, keys, failed, config={}): + self.bucket = bucket + self.keys = keys + self.failed = failed + self.retries = config.get('retry', 5) + + def _do(self, toolbox): + for i in xrange(self.retries): + try: + k = toolbox.get_bucket(self.bucket).delete_keys(self.keys) + if hasattr(k, 'errors') and k.errors: + log.error("Failed to delete: %s" % self.keys) + else: + log.info("Deleted: %s" % self.keys) + return + except S3ResponseError, e: + log.warning("Connection lost, reconnecting and retrying...") + toolbox.reset() + except BotoServerError, e: + break + except (IncompleteRead, SocketError, BotoClientError), e: + log.warning("Caught exception: %r.\nRetrying..." % e) + time.sleep((2 ** i) / 4.0) # Exponential backoff + finally: + pass + + log.error("Failed to delete: %s" % self.keys) + + def run(self, toolbox): + try: + self._do(toolbox) + except JobError, e: + self.failed.put(self.key) + except Exception, e: + self.failed.put(e) + + class CopyJob(Job): "Copy the given key from another bucket." + def __init__(self, bucket, key, failed, config={}): self.bucket = bucket self.key = key @@ -208,12 +259,12 @@ def __init__(self, bucket, key, failed, config={}): self.dest_key = "%s%s" % (self.add_prefix, key) # --del-prefix logic self.del_prefix = config.get('del_prefix') - if self.del_prefix and self.dest_key.startswith(self.del_prefix): + if self.del_prefix and self.dest_key.startswith(self.del_prefix): self.dest_key = self.dest_key.replace(self.del_prefix, '', 1) self.source_bucket = config.get('source_bucket') self.failed = failed self.retries = config.get('retry', 5) - + def _do(self, toolbox): for i in xrange(self.retries): try: @@ -227,10 +278,10 @@ def _do(self, toolbox): break except (IncompleteRead, SocketError, BotoClientError), e: log.warning("Caught exception: %r.\nRetrying..." % e) - time.sleep((2 ** i) / 4.0) # Exponential backoff + time.sleep((2 ** i) / 4.0) # Exponential backoff log.error("Failed to copy: %s" % self.key) - + def run(self, toolbox): try: self._do(toolbox) diff --git a/scripts/s3funnel b/scripts/s3funnel index 9762591..ddf9615 100644 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -79,6 +79,7 @@ def main(): parser.add_option("-i", "--input", dest="input", type="string", metavar="FILE", help="Read one file per line from a FILE manifest") parser.add_option("-v", "--verbose", dest="verbose", action="count", default=None, help="Enable verbose output. Use twice to enable debug output") parser.add_option("--version", dest="version", action="store_true", help="Output version information and exit") + parser.add_option("--batch-delete", dest="batch_delete", default=1, type="int", metavar="N", help="(`delete` only) Delete in batches of N") # Deprecated options (for backwards compatibility) parser.add_option("--start_key", dest="list_marker", type="string", default=None, help=SUPPRESS_HELP) @@ -157,6 +158,13 @@ def main(): elif options.verbose > 0: set_log_level(logging.INFO) + if operation == 'delete' and options.batch_delete > 1: + operation = 'batch_delete' + if options.batch_delete > 1000: + options.batch_delete = 1000 + log.info("Limiting delete batch to 1000") + log.info("Batch delete using batch size of %s" % options.batch_delete) + # Setup operation configuration config = {'acl': options.acl, 'list_marker': options.list_marker or '', @@ -173,23 +181,25 @@ def main(): 'del_prefix': options.del_prefix, 'headers': headers, 'numthreads': options.numthreads, - 'ignore_s3fs_dirs': options.ignore_s3fs_dirs + 'ignore_s3fs_dirs': options.ignore_s3fs_dirs, + 'batch_delete': options.batch_delete } funnel = S3Funnel(**config) # Setup operation mapping methods_keys = { - 'get': funnel.get, - 'put': funnel.put, - 'delete': funnel.delete, - 'copy': funnel.copy, + 'get': funnel.get, + 'put': funnel.put, + 'delete': funnel.delete, + 'batch_delete': funnel.batch_delete, + 'copy': funnel.copy, } methods_bucket = { - 'list': funnel.list_bucket, - 'drop': funnel.drop_bucket, - 'create': funnel.create_bucket, + 'list': funnel.list_bucket, + 'drop': funnel.drop_bucket, + 'create': funnel.create_bucket, } methods_global = { From d8d3b843e05a68e8d985f2abf21118c92d8dd998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Fri, 25 Jul 2014 15:59:37 +0200 Subject: [PATCH 02/12] Update README.md --- README.md | 67 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 5eafcb9..6c89588 100644 --- a/README.md +++ b/README.md @@ -8,52 +8,61 @@ This project is a clone of 's s3funnel from google co - Unix-friendly input and output. Pipe things in, out, and all around. ## Usage - $ s3funnel --help Usage: s3funnel BUCKET OPERATION [OPTIONS] [FILE]... - + s3funnel is a multithreaded tool for performing operations on Amazon's S3. - + Key Operations: DELETE Delete key from the bucket GET Get key from the bucket PUT Put file into the bucket (key is the basename of the path) - + COPY Copy keys to the bucket from another bucket. + Bucket Operations: CREATE Create a new bucket DROP Delete an existing bucket (must be empty) LIST List keys in the bucket. If no bucket is given, buckets will be listed. - - + + Options: - -h, --help show this help message and exit + -h, --help show this help message and exit -a AWS_KEY, --aws_key=AWS_KEY - Overrides AWS_ACCESS_KEY_ID environment variable + Overrides AWS_ACCESS_KEY_ID environment variable -s AWS_SECRET_KEY, --aws_secret_key=AWS_SECRET_KEY - Overrides AWS_SECRET_ACCESS_KEY environment variable + Overrides AWS_SECRET_ACCESS_KEY environment variable -t N, --threads=N Number of threads to use [default: 1] -T SECONDS, --timeout=SECONDS - Socket timeout time, 0 is never [default: 0] - --insecure Don't use secure (https) connection + Socket timeout time, 0 is never [default: 0] + --insecure Don't use secure (https) connection --list-marker=KEY (`list` only) Start key for list operation --list-prefix=STRING (`list` only) Limit results to a specific prefix --list-delimiter=CHAR - (`list` only) Treat value as a delimiter for - hierarchical listing - --put-acl=ACL (`put` only) Set the ACL permission for each file - [default: public-read] + (`list` only) Treat value as a delimiter for + hierarchical listing + --put-acl=ACL (`put` only) Set the ACL permission for each file + [default: public-read] --put-full-path (`put` only) Use the full given path as the key name, - instead of just the basename - --put-only-new (`put` only) Only PUT keys which don't already exist - in the bucket with the same md5 digest + instead of just the basename + --put-only-new (`put` only) Only PUT keys which don't already exist + in the bucket with the same md5 digest --put-header=HEADERS (`put` only) Add the specified header to the request + --add-prefix=ADD_PREFIX + (`put` and `copy` only) Add specified prefix to keys + in destination bucket + --del-prefix=DEL_PREFIX + (`put` and `copy` only) Delete specified prefix from + keys in destination bucket --source-bucket=SOURCE_BUCKET - (`copy` only) Source bucket for files + (`copy` only) Source bucket for files + --no-ignore-s3fs-dirs + (`get` only) Don't ignore s3fs directory objects -i FILE, --input=FILE - Read one file per line from a FILE manifest - -v, --verbose Enable verbose output. Use twice to enable debug - output - --version Output version information and exit - + Read one file per line from a FILE manifest + -v, --verbose Enable verbose output. Use twice to enable debug + output + --version Output version information and exit + --batch-delete=N (`delete` only) Delete in batches of N + ## Examples Note: Appending the -v flag will print useful progress information to stderr. Great for learning the tool. @@ -67,22 +76,18 @@ Note: Appending the -v flag will print useful progress information to stderr. Gr $ s3funnel mybukkit put 1 2 3 ### List files in a bucket $ s3funnel mybukkit list - 1 - 2 - 3 ### Copy files from a bucket $ rm 1 2 3 $ s3funnel mybukkit get 1 2 3 --threads=2 $ ls -1 - 1 - 2 - 3 ### Copy files from another bucket $ s3funnel mybukkit_copy create $ s3funnel mybukkit list | s3funnel mybukkit_copy copy --source-bucket mybukkit --threads=2 ### Empty a bucket $ s3funnel mybukkit list | s3funnel mybukkit delete $ s3funnel mybukkit_copy list | s3funnel mybukkit_copy delete --threads=2 +### Emtpty a bucket using batch delete + $ s3funnel mybukkit list | s3funnel mybukkit_copy delete --threads=10 --batch-delete=1000 ### Delete a bucket $ s3funnel mybukkit drop - $ s3funnel mybukkit_copy drop \ No newline at end of file + $ s3funnel mybukkit_copy drop From a3947ffed36b38ac090da274d4a68992b3b24929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Wed, 20 Aug 2014 15:36:49 +0200 Subject: [PATCH 03/12] FIX: do not validate bucket existence --- s3funnel/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index 71bfb51..3507b68 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -63,7 +63,7 @@ def get_bucket(self, name): conn = self.get_conn() log.debug("Getting bucket instance: %s" % name) try: - bucket = conn.get_bucket(name) + bucket = conn.get_bucket(name, validate=False) except BotoServerError, e: raise FunnelError("Bucket not found: %s" % name, key=name) From f0f5201c75ec003b914cf3a3a1e29b5bf98c1586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Thu, 21 Aug 2014 10:34:34 +0200 Subject: [PATCH 04/12] BUGFIX: handle SIGPIPE and shutdown --- scripts/s3funnel | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/s3funnel b/scripts/s3funnel index ddf9615..55eb130 100644 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -248,6 +248,7 @@ def main(): funnel.shutdown() sys.exit() signal.signal(signal.SIGINT, shutdown) + signal.signal(signal.SIGPIPE, shutdown) # Setup output logger output = logging.getLogger('output') From 3210fa4c0925728490f0b7ed924f48c9388fe036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Mon, 25 Aug 2014 17:38:01 +0200 Subject: [PATCH 05/12] TRIVIAL: make scripts/s3funnel executable --- scripts/s3funnel | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 scripts/s3funnel diff --git a/scripts/s3funnel b/scripts/s3funnel old mode 100644 new mode 100755 From 9dac88297da4c15ecf8b7d68c19aeec844317ea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Thu, 21 Aug 2014 10:36:05 +0200 Subject: [PATCH 06/12] FEATURE: SetACL job --- s3funnel/__init__.py | 25 ++++++++++++++++++++++--- s3funnel/jobs.py | 44 +++++++++++++++++++++++++++++++++++++++++++- scripts/s3funnel | 1 + 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index 3507b68..7d4b388 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -10,9 +10,9 @@ from Queue import Queue, Empty from exceptions import FunnelError -from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob +from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob, SetAclJob -__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel'] +__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel', 'SetAclJob'] # Helpers @@ -164,7 +164,7 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config): yield k.name if k: marker = k.name - more_results= r.is_truncated + more_results = r.is_truncated except BotoServerError, e: raise FunnelError("Failed to list bucket: %s" % name, key=name) except (IncompleteRead, SocketError, BotoClientError), e: @@ -290,3 +290,22 @@ def copy(self, bucket, ikeys, retry=5, acl='public-read', **config): pool.join() return collapse_queue(failed) + + def setacl(self, bucket, ikeys, retry=5, **config): + """ + Given an iterator of file paths, copy these files into the current bucket from source bucket + Return a list of failed keys (if any). + """ + # Setup local config for this request + c = {} + c.update(config) + c['retry'] = retry + + failed = Queue() + pool = self._get_pool() + for k in ikeys: + j = SetAclJob(bucket, k, failed, c) + pool.put(j) + pool.join() + + return collapse_queue(failed) diff --git a/s3funnel/jobs.py b/s3funnel/jobs.py index 52867c2..a748122 100644 --- a/s3funnel/jobs.py +++ b/s3funnel/jobs.py @@ -288,4 +288,46 @@ def run(self, toolbox): except JobError, e: self.failed.put(self.key) except Exception, e: - self.failed.put(e) + self.failed.put(e) + + +class SetAclJob(Job): + "Copy the given key from another bucket." + + def __init__(self, bucket, key, failed, config={}): + self.bucket = bucket + self.key = key + self.failed = failed + self.retries = config.get('retry', 5) + + def _do(self, toolbox): + for i in xrange(self.retries): + try: + # k = toolbox.get_conn().make_request(method='PUT', bucket=self.bucket, key=self.key, + # query_args='acl', + # headers={'x-amz-grant-full-control': 'ID="db6a261a5c90f39366dde55bd72b8db7c7ae729538e8694142b8b68fe2348bdf"'}) + k = toolbox.get_conn().make_request(method='HEAD', bucket=self.bucket, key=self.key) + if k.status == 200: + log.info("Done: %s" % self.key) + else: + log.error("%s on %s" % (k.status, self.key)) + raise BotoServerError + return + except S3ResponseError, e: + log.warning("Connection lost, reconnecting and retrying...") + toolbox.reset() + except BotoServerError, e: + break + except (IncompleteRead, SocketError, BotoClientError), e: + log.warning("Caught exception: %r.\nRetrying..." % e) + time.sleep((2 ** i) / 4.0) # Exponential backoff + + log.error("Failed to copy: %s" % self.key) + + def run(self, toolbox): + try: + self._do(toolbox) + except JobError, e: + self.failed.put(self.key) + except Exception, e: + self.failed.put(e) diff --git a/scripts/s3funnel b/scripts/s3funnel index 55eb130..4e097e1 100755 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -194,6 +194,7 @@ def main(): 'delete': funnel.delete, 'batch_delete': funnel.batch_delete, 'copy': funnel.copy, + 'setacl': funnel.setacl, } methods_bucket = { From 1f557c5edb07af8d21b601d72d019fb26c4f11e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Mon, 25 Aug 2014 17:23:50 +0200 Subject: [PATCH 07/12] BUGFIX: fix wrong backticks --- scripts/s3funnel | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/scripts/s3funnel b/scripts/s3funnel index 4e097e1..eb56bbc 100755 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -65,21 +65,21 @@ def main(): parser.add_option("-t", "--threads", dest="numthreads", default=1, type="int", metavar="N", help="Number of threads to use [default: %default]") parser.add_option("-T", "--timeout", dest="timeout", default=0, type="float", metavar="SECONDS", help="Socket timeout time, 0 is never [default: %default]") parser.add_option("--insecure", dest="secure", action="store_false", default=True, help="Don't use secure (https) connection") - parser.add_option("--list-marker", dest="list_marker", type="string", default=None, metavar="KEY", help="(`list` only) Start key for list operation") - parser.add_option("--list-prefix", dest="list_prefix", type="string", default=None, metavar="STRING", help="(`list` only) Limit results to a specific prefix") - parser.add_option("--list-delimiter", dest="list_delimiter", type="string", default=None, metavar="CHAR", help="(`list` only) Treat value as a delimiter for hierarchical listing") - parser.add_option("--put-acl", dest="acl", type="string", default="public-read", help="(`put` only) Set the ACL permission for each file [default: %default]") - parser.add_option("--put-full-path", dest="put_full_path", action="store_true", help="(`put` only) Use the full given path as the key name, instead of just the basename") - parser.add_option("--put-only-new", dest="put_only_new", action="store_true", help="(`put` only) Only PUT keys which don't already exist in the bucket with the same md5 digest") - parser.add_option("--put-header", dest="headers", type="string", action="append", help="(`put` only) Add the specified header to the request") - parser.add_option("--add-prefix", dest="add_prefix", type="string", default="", help="(`put` and `copy` only) Add specified prefix to keys in destination bucket") - parser.add_option("--del-prefix", dest="del_prefix", type="string", default="", help="(`put` and `copy` only) Delete specified prefix from keys in destination bucket") - parser.add_option("--source-bucket", dest="source_bucket", type="string", help="(`copy` only) Source bucket for files") - parser.add_option("--no-ignore-s3fs-dirs", dest="ignore_s3fs_dirs", action="store_false", default=True, help="(`get` only) Don't ignore s3fs directory objects ") + parser.add_option("--list-marker", dest="list_marker", type="string", default=None, metavar="KEY", help="('list' only) Start key for list operation") + parser.add_option("--list-prefix", dest="list_prefix", type="string", default=None, metavar="STRING", help="('list' only) Limit results to a specific prefix") + parser.add_option("--list-delimiter", dest="list_delimiter", type="string", default=None, metavar="CHAR", help="('list' only) Treat value as a delimiter for hierarchical listing") + parser.add_option("--put-acl", dest="acl", type="string", default="public-read", help="('put' only) Set the ACL permission for each file [default: %default]") + parser.add_option("--put-full-path", dest="put_full_path", action="store_true", help="('put' only) Use the full given path as the key name, instead of just the basename") + parser.add_option("--put-only-new", dest="put_only_new", action="store_true", help="('put' only) Only PUT keys which don't already exist in the bucket with the same md5 digest") + parser.add_option("--put-header", dest="headers", type="string", action="append", help="('put' only) Add the specified header to the request") + parser.add_option("--add-prefix", dest="add_prefix", type="string", default="", help="('put' and 'copy' only) Add specified prefix to keys in destination bucket") + parser.add_option("--del-prefix", dest="del_prefix", type="string", default="", help="('put' and 'copy' only) Delete specified prefix from keys in destination bucket") + parser.add_option("--source-bucket", dest="source_bucket", type="string", help="('copy' only) Source bucket for files") + parser.add_option("--no-ignore-s3fs-dirs", dest="ignore_s3fs_dirs", action="store_false", default=True, help="('get' only) Don't ignore s3fs directory objects ") parser.add_option("-i", "--input", dest="input", type="string", metavar="FILE", help="Read one file per line from a FILE manifest") parser.add_option("-v", "--verbose", dest="verbose", action="count", default=None, help="Enable verbose output. Use twice to enable debug output") parser.add_option("--version", dest="version", action="store_true", help="Output version information and exit") - parser.add_option("--batch-delete", dest="batch_delete", default=1, type="int", metavar="N", help="(`delete` only) Delete in batches of N") + parser.add_option("--batch-delete", dest="batch_delete", default=1, type="int", metavar="N", help="('delete' only) Delete in batches of N") # Deprecated options (for backwards compatibility) parser.add_option("--start_key", dest="list_marker", type="string", default=None, help=SUPPRESS_HELP) @@ -105,18 +105,18 @@ def main(): if options.aws_secret_key: aws_secret_key = options.aws_secret_key if None in [aws_key, aws_secret_key]: - parser.error("Missing required arguments `aws_key` or `aws_secret_key`") + parser.error("Missing required arguments 'aws_key' or 'aws_secret_key'") ## Threads if options.numthreads < 1: - parser.error("`theads` must be at least 1") + parser.error("'theads' must be at least 1") ## Misc. options if options.timeout: try: socket.setdefaulttimeout(options.timeout) except TypeError, e: - parser.error("`timeout` error: %s" % e.message) + parser.error("'timeout' error: %s" % e.message) ## Parse put headers headers = {} @@ -224,7 +224,7 @@ def main(): except (IOError, IndexError), e: log.error("%s: File not found" % options.input) return -1 - input_src = "`%s'" % options.input + input_src = "'%s'" % options.input elif len(args) < 3: # Get source from stdin input_src = "stdin" From f6ed22b1bd4043ed9c37f75c4c48c565f33adf20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Mon, 25 Aug 2014 17:29:59 +0200 Subject: [PATCH 08/12] fixup! SetAcl --- s3funnel/jobs.py | 56 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/s3funnel/jobs.py b/s3funnel/jobs.py index a748122..4b87d61 100644 --- a/s3funnel/jobs.py +++ b/s3funnel/jobs.py @@ -294,6 +294,15 @@ def run(self, toolbox): class SetAclJob(Job): "Copy the given key from another bucket." + import boto.s3.acl + import boto.s3.user + + policy = boto.s3.acl.Policy() + policy.owner = boto.s3.user.User(id='db6a261a5c90f39366dde55bd72b8db7c7ae729538e8694142b8b68fe2348bdf') + policy.acl = boto.s3.acl.ACL() + policy.acl.add_user_grant(permission='FULL_CONTROL', + user_id='db6a261a5c90f39366dde55bd72b8db7c7ae729538e8694142b8b68fe2348bdf') + def __init__(self, bucket, key, failed, config={}): self.bucket = bucket self.key = key @@ -303,19 +312,46 @@ def __init__(self, bucket, key, failed, config={}): def _do(self, toolbox): for i in xrange(self.retries): try: - # k = toolbox.get_conn().make_request(method='PUT', bucket=self.bucket, key=self.key, - # query_args='acl', - # headers={'x-amz-grant-full-control': 'ID="db6a261a5c90f39366dde55bd72b8db7c7ae729538e8694142b8b68fe2348bdf"'}) - k = toolbox.get_conn().make_request(method='HEAD', bucket=self.bucket, key=self.key) - if k.status == 200: - log.info("Done: %s" % self.key) + x = toolbox.get_bucket(self.bucket).set_acl(self.policy, key_name=self.key) + return + except S3ResponseError, e: + if e.status == 404: + log.warning("%s not found" % self.key) + return + elif e.status == 403: + log.warning("%s access denied" % self.key) + return else: - log.error("%s on %s" % (k.status, self.key)) - raise BotoServerError + log.warning("Connection lost, reconnecting and retrying...") + toolbox.reset() + except BotoServerError, e: + break + except (IncompleteRead, SocketError, BotoClientError), e: + log.warning("Caught exception: %r.\nRetrying..." % e) + time.sleep((2 ** i) / 4.0) # Exponential backoff + + log.error("Failed to copy: %s" % self.key) + + def run(self, toolbox): + try: + self._do(toolbox) + except JobError, e: + self.failed.put(self.key) + except Exception, e: + self.failed.put(e) + + return except S3ResponseError, e: - log.warning("Connection lost, reconnecting and retrying...") - toolbox.reset() + if e.status == 404: + log.warning("%s not found" % self.key) + return + elif e.status == 403: + log.warning("%s access denied" % self.key) + return + else: + log.warning("Connection lost, reconnecting and retrying...") + toolbox.reset() except BotoServerError, e: break except (IncompleteRead, SocketError, BotoClientError), e: From e5d26aaeb76c8e017a7ba8be0501173acd9342d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Mon, 25 Aug 2014 17:33:46 +0200 Subject: [PATCH 09/12] FEATURE: CheckACL --- s3funnel/__init__.py | 24 ++++++++++++++++++++++-- s3funnel/jobs.py | 26 ++++++++++++++++++++++++++ scripts/s3funnel | 1 + 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index 7d4b388..5af18a5 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -10,9 +10,9 @@ from Queue import Queue, Empty from exceptions import FunnelError -from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob, SetAclJob +from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob, SetAclJob, CheckAclJob -__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel', 'SetAclJob'] +__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel', 'SetAclJob', 'CheckAclJob'] # Helpers @@ -309,3 +309,23 @@ def setacl(self, bucket, ikeys, retry=5, **config): pool.join() return collapse_queue(failed) + + + def checkacl(self, bucket, ikeys, retry=5, **config): + """ + Given an iterator of file paths, copy these files into the current bucket from source bucket + Return a list of failed keys (if any). + """ + # Setup local config for this request + c = {} + c.update(config) + c['retry'] = retry + + failed = Queue() + pool = self._get_pool() + for k in ikeys: + j = CheckAclJob(bucket, k, failed, c) + pool.put(j) + pool.join() + + return collapse_queue(failed) diff --git a/s3funnel/jobs.py b/s3funnel/jobs.py index 4b87d61..59ee61c 100644 --- a/s3funnel/jobs.py +++ b/s3funnel/jobs.py @@ -341,6 +341,32 @@ def run(self, toolbox): self.failed.put(e) +class CheckAclJob(Job): + "Copy the given key from another bucket." + + aws_services_owner = 'db6a261a5c90f39366dde55bd72b8db7c7ae729538e8694142b8b68fe2348bdf' + + def __init__(self, bucket, key, failed, config={}): + self.bucket = bucket + self.key = key + self.failed = failed + self.retries = config.get('retry', 5) + + def _do(self, toolbox): + for i in xrange(self.retries): + try: + ok = True + x = toolbox.get_bucket(self.bucket).get_acl(key_name=self.key) + if not x.owner.id == self.aws_services_owner: + print "%s Owner wrong: %s" % (self.key, x.owner.display_name) + ok = False + elif len(x.acl.grants) > 0: + for acl in x.acl.grants: + if not (acl.id == self.aws_services_owner and acl.permission == 'FULL_CONTROL'): + print "%s has grant for %s" % (self.key, x.acl.grants[acl].display_name) + ok = False + if ok: + print "OK: %s" % self.key return except S3ResponseError, e: if e.status == 404: diff --git a/scripts/s3funnel b/scripts/s3funnel index eb56bbc..1b06e78 100755 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -194,6 +194,7 @@ def main(): 'delete': funnel.delete, 'batch_delete': funnel.batch_delete, 'copy': funnel.copy, + 'checkacl': funnel.checkacl, 'setacl': funnel.setacl, } From 1d4f52b0de9814f02f6f941f7cabdc291f9f50ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Thu, 21 Aug 2014 10:36:05 +0200 Subject: [PATCH 10/12] FEATURE: Add LOOKUP job --- s3funnel/__init__.py | 44 +++++++++++++++++++++++++++++++++++++++++--- s3funnel/jobs.py | 43 +++++++++++++++++++++++++++++++++++++++++-- scripts/s3funnel | 1 + 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index 3507b68..4596809 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -10,9 +10,9 @@ from Queue import Queue, Empty from exceptions import FunnelError -from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob +from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob, LookupJob -__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel'] +__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel', 'LookupJob'] # Helpers @@ -164,7 +164,7 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config): yield k.name if k: marker = k.name - more_results= r.is_truncated + more_results = r.is_truncated except BotoServerError, e: raise FunnelError("Failed to list bucket: %s" % name, key=name) except (IncompleteRead, SocketError, BotoClientError), e: @@ -290,3 +290,41 @@ def copy(self, bucket, ikeys, retry=5, acl='public-read', **config): pool.join() return collapse_queue(failed) + + def setacl(self, bucket, ikeys, retry=5, **config): + """ + Given an iterator of file paths, copy these files into the current bucket from source bucket + Return a list of failed keys (if any). + """ + # Setup local config for this request + c = {} + c.update(config) + c['retry'] = retry + + failed = Queue() + pool = self._get_pool() + for k in ikeys: + j = SetAclJob(bucket, k, failed, c) + pool.put(j) + pool.join() + + return collapse_queue(failed) + + def lookup(self, bucket, ikeys, retry=5, **config): + """ + Given an iterator of file paths, copy these files into the current bucket from source bucket + Return a list of failed keys (if any). + """ + # Setup local config for this request + c = {} + c.update(config) + c['retry'] = retry + + failed = Queue() + pool = self._get_pool() + for k in ikeys: + j = LookupJob(bucket, k, failed, c) + pool.put(j) + pool.join() + + return collapse_queue(failed) diff --git a/s3funnel/jobs.py b/s3funnel/jobs.py index 52867c2..2b29900 100644 --- a/s3funnel/jobs.py +++ b/s3funnel/jobs.py @@ -36,7 +36,7 @@ class JobError(Exception): class GetJob(Job): "Download the given key from S3." - def __init__(self, bucket, key, failed, config={}): + def __init__(self, bucket, key, failed, config=dict()): self.bucket = bucket self.key = key self.failed = failed @@ -288,4 +288,43 @@ def run(self, toolbox): except JobError, e: self.failed.put(self.key) except Exception, e: - self.failed.put(e) + self.failed.put(e) + + +class LookupJob(Job): + """Lookup the given key. Checks accessibility of the key""" + + def __init__(self, bucket, key, failed, config): + self.bucket = bucket + self.key = key + self.failed = failed + self.retries = config.get('retry', 5) + + def _do(self, toolbox): + for i in xrange(self.retries): + try: + if toolbox.get_bucket(self.bucket).lookup(self.key): + log.info("OK: %s" % self.key) + return + except S3ResponseError, e: + if e.status == 403 or e.status == 404: + log.error("%s on %s" % (e.status, self.key)) + return + else: + log.warning("Connection lost, reconnecting and retrying...") + toolbox.reset() + except BotoServerError, e: + break + except (IncompleteRead, SocketError, BotoClientError), e: + log.warning("Caught exception: %r.\nRetrying..." % e) + time.sleep((2 ** i) / 4.0) # Exponential backoff + + log.error("Failed to copy: %s" % self.key) + + def run(self, toolbox): + try: + self._do(toolbox) + except JobError, e: + self.failed.put(self.key) + except Exception, e: + self.failed.put(e) diff --git a/scripts/s3funnel b/scripts/s3funnel index 55eb130..03c7da6 100755 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -194,6 +194,7 @@ def main(): 'delete': funnel.delete, 'batch_delete': funnel.batch_delete, 'copy': funnel.copy, + 'lookup': funnel.lookup, } methods_bucket = { From a8e4006d1b3af47eceb0c122a2c41ee40f895be7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Mon, 25 Aug 2014 18:14:51 +0200 Subject: [PATCH 11/12] TRIVIAL: git ignore ".idea" --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 69a6efd..28c86ec 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ build/ dist/ *.pyc *.egg-info +.idea From c6b33fef44cf305c803e4ef17dedf396efdb1be3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Petr=20Tich=C3=BD?= Date: Tue, 26 Aug 2014 16:05:20 +0200 Subject: [PATCH 12/12] FEATURE: add --check-owner --- s3funnel/__init__.py | 5 +++++ scripts/s3funnel | 2 ++ 2 files changed, 7 insertions(+) diff --git a/s3funnel/__init__.py b/s3funnel/__init__.py index b990d13..57852fb 100644 --- a/s3funnel/__init__.py +++ b/s3funnel/__init__.py @@ -152,6 +152,9 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config): marker = marker or config.get('list_marker') or '' prefix = prefix or config.get('list_prefix') or '' delimiter = delimiter or config.get('list_delimiter') or '' + check_owner = config.get('check_owner') or False + if check_owner: + bucket_owner = bucket.get_acl().owner more_results = True k = None @@ -161,6 +164,8 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config): try: r = bucket.get_all_keys(marker=marker, prefix=prefix, delimiter=delimiter) for k in r: + if check_owner and k.owner.id != bucket_owner.id: + log.warning('{} owned by {}'.format(k.name, k.owner.display_name)) yield k.name if k: marker = k.name diff --git a/scripts/s3funnel b/scripts/s3funnel index 9b1fa14..79d0875 100755 --- a/scripts/s3funnel +++ b/scripts/s3funnel @@ -68,6 +68,7 @@ def main(): parser.add_option("--list-marker", dest="list_marker", type="string", default=None, metavar="KEY", help="('list' only) Start key for list operation") parser.add_option("--list-prefix", dest="list_prefix", type="string", default=None, metavar="STRING", help="('list' only) Limit results to a specific prefix") parser.add_option("--list-delimiter", dest="list_delimiter", type="string", default=None, metavar="CHAR", help="('list' only) Treat value as a delimiter for hierarchical listing") + parser.add_option("--check-owner", dest="check_owner", action="store_true", help="('list' only) Check that the bucket owner owns every key listed") parser.add_option("--put-acl", dest="acl", type="string", default="public-read", help="('put' only) Set the ACL permission for each file [default: %default]") parser.add_option("--put-full-path", dest="put_full_path", action="store_true", help="('put' only) Use the full given path as the key name, instead of just the basename") parser.add_option("--put-only-new", dest="put_only_new", action="store_true", help="('put' only) Only PUT keys which don't already exist in the bucket with the same md5 digest") @@ -170,6 +171,7 @@ def main(): 'list_marker': options.list_marker or '', 'list_prefix': options.list_prefix or '', 'list_delimiter': options.list_delimiter or '', + 'check_owner': options.check_owner, 'aws_key': aws_key, 'aws_secret_key': aws_secret_key, 'secure': options.secure,