Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Add AWS Multi-Object Delete #16

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ build/
dist/
*.pyc
*.egg-info
.idea
67 changes: 36 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,61 @@ This project is a clone of <https://github.com/shazow>'s s3funnel from google co
- Unix-friendly input and output. Pipe things in, out, and all around.

## Usage
$ s3funnel --help
Usage: s3funnel BUCKET OPERATION [OPTIONS] [FILE]...

s3funnel is a multithreaded tool for performing operations on Amazon's S3.

Key Operations:
DELETE Delete key from the bucket
GET Get key from the bucket
PUT Put file into the bucket (key is the basename of the path)

COPY Copy keys to the bucket from another bucket.

Bucket Operations:
CREATE Create a new bucket
DROP Delete an existing bucket (must be empty)
LIST List keys in the bucket. If no bucket is given, buckets will be listed.


Options:
-h, --help show this help message and exit
-h, --help show this help message and exit
-a AWS_KEY, --aws_key=AWS_KEY
Overrides AWS_ACCESS_KEY_ID environment variable
Overrides AWS_ACCESS_KEY_ID environment variable
-s AWS_SECRET_KEY, --aws_secret_key=AWS_SECRET_KEY
Overrides AWS_SECRET_ACCESS_KEY environment variable
Overrides AWS_SECRET_ACCESS_KEY environment variable
-t N, --threads=N Number of threads to use [default: 1]
-T SECONDS, --timeout=SECONDS
Socket timeout time, 0 is never [default: 0]
--insecure Don't use secure (https) connection
Socket timeout time, 0 is never [default: 0]
--insecure Don't use secure (https) connection
--list-marker=KEY (`list` only) Start key for list operation
--list-prefix=STRING (`list` only) Limit results to a specific prefix
--list-delimiter=CHAR
(`list` only) Treat value as a delimiter for
hierarchical listing
--put-acl=ACL (`put` only) Set the ACL permission for each file
[default: public-read]
(`list` only) Treat value as a delimiter for
hierarchical listing
--put-acl=ACL (`put` only) Set the ACL permission for each file
[default: public-read]
--put-full-path (`put` only) Use the full given path as the key name,
instead of just the basename
--put-only-new (`put` only) Only PUT keys which don't already exist
in the bucket with the same md5 digest
instead of just the basename
--put-only-new (`put` only) Only PUT keys which don't already exist
in the bucket with the same md5 digest
--put-header=HEADERS (`put` only) Add the specified header to the request
--add-prefix=ADD_PREFIX
(`put` and `copy` only) Add specified prefix to keys
in destination bucket
--del-prefix=DEL_PREFIX
(`put` and `copy` only) Delete specified prefix from
keys in destination bucket
--source-bucket=SOURCE_BUCKET
(`copy` only) Source bucket for files
(`copy` only) Source bucket for files
--no-ignore-s3fs-dirs
(`get` only) Don't ignore s3fs directory objects
-i FILE, --input=FILE
Read one file per line from a FILE manifest
-v, --verbose Enable verbose output. Use twice to enable debug
output
--version Output version information and exit

Read one file per line from a FILE manifest
-v, --verbose Enable verbose output. Use twice to enable debug
output
--version Output version information and exit
--batch-delete=N (`delete` only) Delete in batches of N

## Examples
Note: Appending the -v flag will print useful progress information to stderr. Great for learning the tool.

Expand All @@ -67,22 +76,18 @@ Note: Appending the -v flag will print useful progress information to stderr. Gr
$ s3funnel mybukkit put 1 2 3
### List files in a bucket
$ s3funnel mybukkit list
1
2
3
### Copy files from a bucket
$ rm 1 2 3
$ s3funnel mybukkit get 1 2 3 --threads=2
$ ls -1
1
2
3
### Copy files from another bucket
$ s3funnel mybukkit_copy create
$ s3funnel mybukkit list | s3funnel mybukkit_copy copy --source-bucket mybukkit --threads=2
### Empty a bucket
$ s3funnel mybukkit list | s3funnel mybukkit delete
$ s3funnel mybukkit_copy list | s3funnel mybukkit_copy delete --threads=2
### Emtpty a bucket using batch delete
$ s3funnel mybukkit list | s3funnel mybukkit_copy delete --threads=10 --batch-delete=1000
### Delete a bucket
$ s3funnel mybukkit drop
$ s3funnel mybukkit_copy drop
$ s3funnel mybukkit_copy drop
112 changes: 108 additions & 4 deletions s3funnel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from Queue import Queue, Empty
from exceptions import FunnelError

from jobs import GetJob, PutJob, DeleteJob, CopyJob
from jobs import GetJob, PutJob, DeleteJob, DeleteMulitpleJob, CopyJob, LookupJob , SetAclJob, CheckAclJob

__all__ = ['GetJob','PutJob','DeleteJob','S3ToolBox','BucketFunnel']
__all__ = ['GetJob', 'PutJob', 'DeleteJob', 'DeleteMulitpleJob', 'S3ToolBox', 'BucketFunnel', 'LookupJob', 'SetAclJob', 'CheckAclJob']

# Helpers

Expand Down Expand Up @@ -63,7 +63,7 @@ def get_bucket(self, name):
conn = self.get_conn()
log.debug("Getting bucket instance: %s" % name)
try:
bucket = conn.get_bucket(name)
bucket = conn.get_bucket(name, validate=False)
except BotoServerError, e:
raise FunnelError("Bucket not found: %s" % name, key=name)

Expand Down Expand Up @@ -152,6 +152,9 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config):
marker = marker or config.get('list_marker') or ''
prefix = prefix or config.get('list_prefix') or ''
delimiter = delimiter or config.get('list_delimiter') or ''
check_owner = config.get('check_owner') or False
if check_owner:
bucket_owner = bucket.get_acl().owner

more_results = True
k = None
Expand All @@ -161,10 +164,12 @@ def list_bucket(self, name, marker=None, prefix=None, delimiter=None, **config):
try:
r = bucket.get_all_keys(marker=marker, prefix=prefix, delimiter=delimiter)
for k in r:
if check_owner and k.owner.id != bucket_owner.id:
log.warning('{} owned by {}'.format(k.name, k.owner.display_name))
yield k.name
if k:
marker = k.name
more_results= r.is_truncated
more_results = r.is_truncated
except BotoServerError, e:
raise FunnelError("Failed to list bucket: %s" % name, key=name)
except (IncompleteRead, SocketError, BotoClientError), e:
Expand Down Expand Up @@ -192,6 +197,46 @@ def delete(self, bucket, ikeys, retry=5, **config):

return collapse_queue(failed)

def batch_delete(self, bucket, ikeys, retry=5, **config):
"""
Given an iterator of key names, delete these keys from the current bucket.
Return a list of failed keys (if any).
"""

def grouper(iterable, n):
iterable = iter(iterable)
i = n
group = []
while True:
try:
item = iterable.next()
if item:
group.append(item)
i -= 1
if i == 0:
yield group
group = []
i = n
except StopIteration:
if group:
yield group
break

# Setup local config for this request
c = {}
c.update(config)
c['retry'] = retry

failed = Queue()
pool = self._get_pool()
batch_size = c['batch_delete']
for k in grouper(ikeys, batch_size):
j = DeleteMulitpleJob(bucket, k, failed, c)
pool.put(j)
pool.join()

return collapse_queue(failed)

def get(self, bucket, ikeys, retry=5, **config):
"""
Given an iterator of key names, download these files from the current bucket.
Expand Down Expand Up @@ -250,3 +295,62 @@ def copy(self, bucket, ikeys, retry=5, acl='public-read', **config):
pool.join()

return collapse_queue(failed)

def lookup(self, bucket, ikeys, retry=5, **config):
"""
Given an iterator of file paths, copy these files into the current bucket from source bucket
Return a list of failed keys (if any).
"""
# Setup local config for this request
c = {}
c.update(config)
c['retry'] = retry

failed = Queue()
pool = self._get_pool()
for k in ikeys:
j = LookupJob(bucket, k, failed, c)
pool.put(j)
pool.join()

return collapse_queue(failed)


def setacl(self, bucket, ikeys, retry=5, **config):
"""
Given an iterator of file paths, copy these files into the current bucket from source bucket
Return a list of failed keys (if any).
"""
# Setup local config for this request
c = {}
c.update(config)
c['retry'] = retry

failed = Queue()
pool = self._get_pool()
for k in ikeys:
j = SetAclJob(bucket, k, failed, c)
pool.put(j)
pool.join()

return collapse_queue(failed)


def checkacl(self, bucket, ikeys, retry=5, **config):
"""
Given an iterator of file paths, copy these files into the current bucket from source bucket
Return a list of failed keys (if any).
"""
# Setup local config for this request
c = {}
c.update(config)
c['retry'] = retry

failed = Queue()
pool = self._get_pool()
for k in ikeys:
j = CheckAclJob(bucket, k, failed, c)
pool.put(j)
pool.join()

return collapse_queue(failed)
Loading