Skip to content

Commit

Permalink
Implement select object (#772)
Browse files Browse the repository at this point in the history
Selects and filters out data from stored object.
Client for select feature
  • Loading branch information
sinhaashish authored and harshavardhana committed Aug 5, 2019
1 parent df7628c commit 6a6dc7a
Show file tree
Hide file tree
Showing 10 changed files with 1,010 additions and 138 deletions.
74 changes: 74 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ s3Client = Minio('s3.amazonaws.com',
| | [`fput_object`](#fput_object) | | |
| | [`fget_object`](#fget_object) | | |
| | [`get_partial_object`](#get_partial_object) | | |
| | [`select_object_content`](#select_object_content) | | |

## 1. Constructor

Expand Down Expand Up @@ -668,6 +669,79 @@ except ResponseError as err:
print(err)
```

<a name="select_object_content"></a>
### select_object_content(self, bucket_name, object_name, options)
Select object content filters the contents of object based on a simple structured query language (SQL).

__Parameters__

|Param |Type |Description |
|:---|:---|:---|
|``bucket_name`` |_string_ |Name of the bucket. |
|``object_name`` |_string_ |Name of the object. |
|``options`` | _SelectObjectReader_ | Query Options |


__Return Value__

|Param |Type |Description |
|:---|:---|:---|
|``obj``| _SelectObjectReader_ |Select_object_reader object. |



__Example__


```py
client = Minio('s3.amazonaws.com',
access_key='YOUR-ACCESSKEY',
secret_key='YOUR-SECRETKEY')

options = SelectObjectOptions(
expression=" select * from s3object",
input_serialization=InputSerialization(
compression_type="NONE",
csv=CSVInput(FileHeaderInfo="USE",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter='"',
QuoteEscapeCharacter='"',
Comments="#",
AllowQuotedRecordDelimiter="FALSE",
),
),

output_serialization=OutputSerialization(
csv=CSVOutput(QuoteFields="ASNEEDED",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter='"',
QuoteEscapeCharacter='"',)
),
request_progress=RequestProgress(
enabled="FLASE"
)
)

try:
data = client.select_object_content('my-bucket', 'my-object', options)

# Get the records
with open('my-record-file', 'w') as record_data:
for d in data.stream(10*1024):
record_data.write(d)

# Get the stats
print(data.stats())

except CRCValidationError as err:
print(err)
except ResponseError as err:
print(err)

```

<a name="fget_object"></a>
### fget_object(bucket_name, object_name, file_path, request_headers=None)
Downloads and saves the object as a file in the local filesystem.
Expand Down
77 changes: 77 additions & 0 deletions examples/select_object_content.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2019 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from minio import Minio
from minio.error import ResponseError
from minio.select_object_reader import CRCValidationError
from minio.select_object_options import (SelectObjectOptions, CSVInput,
JSONInput, RequestProgress,
ParquetInput, InputSerialization,
OutputSerialization, CSVOutput,
JsonOutput)

client = Minio('s3.amazonaws.com',
access_key='YOUR-ACCESSKEY',
secret_key='YOUR-SECRETKEY')

options = SelectObjectOptions(
expression="select * from s3object",
input_serialization=InputSerialization(
compression_type="NONE",
csv=CSVInput(FileHeaderInfo="USE",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter='"',
QuoteEscapeCharacter='"',
Comments="#",
AllowQuotedRecordDelimiter="FALSE",
),
# If input is JSON
# json=JSONInput(Type="DOCUMENT",)
),

output_serialization=OutputSerialization(
csv=CSVOutput(QuoteFields="ASNEEDED",
RecordDelimiter="\n",
FieldDelimiter=",",
QuoteCharacter='"',
QuoteEscapeCharacter='"',)

# json = JsonOutput(
# RecordDelimiter="\n",
# )
),
request_progress=RequestProgress(
enabled="False"
)
)

try:
data = client.select_object_content('your-bucket', 'your-object', options)

# Get the records
with open('my-record-file', 'w') as record_data:
for d in data.stream(10*1024):
record_data.write(d)

# Get the stats
print(data.stats())

except CRCValidationError as err:
print(err)
except ResponseError as err:
print(err)
5 changes: 4 additions & 1 deletion minio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
__author__ = 'MinIO, Inc.'
__version__ = '4.0.19'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2015, 2016, 2017 MinIO, Inc.'
__copyright__ = 'Copyright 2015, 2016, 2017, 2018, 2019 MinIO, Inc.'

from .api import Minio
from .error import ResponseError
from .post_policy import PostPolicy
from .copy_conditions import CopyConditions
from .definitions import Bucket, Object
from .select_object_reader import SelectObjectReader


42 changes: 41 additions & 1 deletion minio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@
from .xml_marshal import (xml_marshal_bucket_constraint,
xml_marshal_complete_multipart_upload,
xml_marshal_bucket_notifications,
xml_marshal_delete_objects)
xml_marshal_delete_objects,
xml_marshal_select)
from .fold_case_dict import FoldCaseDict
from .thread_pool import ThreadPool
from .select_object_reader import SelectObjectReader

# Comment format.
_COMMENTS = '({0}; {1})'
Expand Down Expand Up @@ -235,6 +237,44 @@ def trace_off(self):
"""
self._trace_output_stream = None

# Select Object Content
def select_object_content(self, bucket_name, object_name, opts):
"""
Executes SQL requests on objects having data in CSV, JSON
or Parquet formats.
Examples:
data = client.select_object_content('foo', 'test.csv', options)
:param bucket_name: Bucket to read object from
:param object_name: Name of object to read
:param options: Options for select object
"""
is_valid_bucket_name(bucket_name)
is_non_empty_string(object_name)

content = xml_marshal_select(opts)
url_values = dict()
url_values["select"] = ""
url_values["select-type"] = "2"

headers = {
'Content-Length': str(len(content)),
'Content-Md5': get_md5_base64digest(content)
}
content_sha256_hex = get_sha256_hexdigest(content)
response = self._url_open(
'POST',
bucket_name=bucket_name,
object_name=object_name,
query=url_values,
headers=headers,
body=content,
content_sha256=content_sha256_hex,
preload_content=False)

return SelectObjectReader(response)

# Bucket level
def make_bucket(self, bucket_name, location='us-east-1'):
"""
Expand Down
15 changes: 13 additions & 2 deletions minio/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,23 @@
InvalidArgumentError)

# Constants
MAX_MULTIPART_COUNT = 10000 # 10000 parts
MAX_MULTIPART_COUNT = 10000 # 10000 parts
MAX_MULTIPART_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 * 1024 # 5TiB
MAX_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5GiB
MAX_POOL_SIZE = 10
MIN_PART_SIZE = 5 * 1024 * 1024 # 5MiB
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB
DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB


# Select Object Content
READ_SIZE_SELECT = 32 * 1024 # Buffer size
SQL = 'SQL' # Value for ExpressionType
EVENT_RECORDS = 'Records' # Event Type is Records
EVENT_PROGRESS = 'Progress' # Event Type Progress
EVENT_STATS = 'Stats' # Event Type Stats
EVENT = 'event' # Message Type is event
EVENT_END = 'End' # Event Type is End
ERROR = 'error' # Message Type is error

_VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$')
_ALLOWED_HOSTNAME_REGEX = re.compile(
Expand Down
121 changes: 121 additions & 0 deletions minio/select_object_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2019 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module creates the request for Select
:copyright: (c) 2019 by MinIO, Inc.
:license: Apache 2.0, see LICENSE for more details.
"""
from .helpers import (SQL)


class CSVInput:
"""
CSVInput: Input Format as CSV.
"""
def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"', Comments="#",
AllowQuotedRecordDelimiter=False):
self.FileHeaderInfo = FileHeaderInfo
self.RecordDelimiter = RecordDelimiter
self.FieldDelimiter = FieldDelimiter
self.QuoteCharacter = QuoteCharacter
self.QuoteEscapeCharacter = QuoteEscapeCharacter
self.Comments = Comments
self.AllowQuotedRecordDelimiter = AllowQuotedRecordDelimiter


class JSONInput:
"""
JSONInput: Input format as JSON.
"""
def __init__(self, Type=None):
self.Type = Type


class ParquetInput:
"""
ParquetInput: Input format as Parquet
"""


class InputSerialization:
"""
InputSerialization: nput Format.
"""
def __init__(self, compression_type="NONE", csv=None, json=None, par=None):
self.compression_type = compression_type
self.csv_input = csv
self.json_input = json
self.parquet_input = par


class CSVOutput:
"""
CSVOutput: Output as CSV.
"""
def __init__(self, QuoteFields="ASNEEDED", RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"'):
self.QuoteFields = QuoteFields
self.RecordDelimiter = RecordDelimiter
self.FieldDelimiter = FieldDelimiter
self.QuoteCharacter = QuoteCharacter
self.QuoteEscapeCharacter = QuoteEscapeCharacter


class JsonOutput:
"""
JsonOutput- Output as JSON.
"""
def __init__(self, RecordDelimiter="\n"):
self.RecordDelimiter = RecordDelimiter


class OutputSerialization:
"""
OutputSerialization: Output Format.
"""
def __init__(self, csv=None, json=None):
self.csv_output = csv
self.json_output = json


class RequestProgress:
"""
RequestProgress: Sends progress message.
"""
def __init__(self, enabled=False):
self.enabled = enabled


class SelectObjectOptions:
"""
SelectObjectOptions: Options for select object
"""
expression_type = SQL

def __init__(self, expression, input_serialization,
output_serialization, request_progress):
self.expression = expression
self.in_ser = input_serialization
self.out_ser = output_serialization
self.req_progress = request_progress
Loading

0 comments on commit 6a6dc7a

Please sign in to comment.