diff --git a/.gitignore b/.gitignore index 96246b3c..c944f1bc 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ var/ *.egg-info/ .installed.cfg *.egg +*~ +.#* +#* \ No newline at end of file diff --git a/docs/API.md b/docs/API.md index eb654491..46a2a53c 100644 --- a/docs/API.md +++ b/docs/API.md @@ -690,7 +690,6 @@ __Return Value__ |``obj``| _SelectObjectReader_ |Select_object_reader object. | - __Example__ @@ -736,7 +735,7 @@ try: # Get the stats print(data.stats()) -except CRCValidationError as err: +except SelectCRCValidationError as err: print(err) except ResponseError as err: print(err) diff --git a/examples/select_object_content.py b/examples/select_object_content.py index 69728b4a..1ca4151d 100644 --- a/examples/select_object_content.py +++ b/examples/select_object_content.py @@ -17,12 +17,12 @@ from minio import Minio from minio.error import ResponseError -from minio.select_object_reader import CRCValidationError -from minio.select_object_options import (SelectObjectOptions, CSVInput, - JSONInput, RequestProgress, - ParquetInput, InputSerialization, - OutputSerialization, CSVOutput, - JsonOutput) +from minio.select.errors import SelectCRCValidationError, SelectMessageError +from minio.select.options import (SelectObjectOptions, CSVInput, + JSONInput, RequestProgress, + ParquetInput, InputSerialization, + OutputSerialization, CSVOutput, + JsonOutput) client = Minio('s3.amazonaws.com', access_key='YOUR-ACCESSKEY', @@ -71,7 +71,11 @@ # Get the stats print(data.stats()) -except CRCValidationError as err: +except SelectMessageError as err: print(err) + +except SelectCRCValidationError as err: + print(err) + except ResponseError as err: print(err) diff --git a/minio/__init__.py b/minio/__init__.py index c45d52b1..fc6c4d03 100644 --- a/minio/__init__.py +++ b/minio/__init__.py @@ -29,7 +29,7 @@ __title__ = 'minio-py' __author__ = 'MinIO, Inc.' -__version__ = '4.0.22' +__version__ = '5.0.0' __license__ = 'Apache 2.0' __copyright__ = 'Copyright 2015, 2016, 2017, 2018, 2019 MinIO, Inc.' @@ -38,6 +38,3 @@ from .post_policy import PostPolicy from .copy_conditions import CopyConditions from .definitions import Bucket, Object -from .select_object_reader import SelectObjectReader - - diff --git a/minio/api.py b/minio/api.py index a3d7a890..7ea2e7ea 100644 --- a/minio/api.py +++ b/minio/api.py @@ -77,8 +77,7 @@ is_valid_bucket_notification_config, is_valid_policy_type, mkdir_p, dump_http, amzprefix_user_metadata, is_supported_header,is_amz_header) -from .helpers import (MAX_MULTIPART_OBJECT_SIZE, - MAX_PART_SIZE, +from .helpers import (MAX_PART_SIZE, MAX_POOL_SIZE, MIN_PART_SIZE, DEFAULT_PART_SIZE, @@ -94,7 +93,7 @@ xml_marshal_select) from .fold_case_dict import FoldCaseDict from .thread_pool import ThreadPool -from .select_object_reader import SelectObjectReader +from .select import SelectObjectReader # Comment format. _COMMENTS = '({0}; {1})' diff --git a/minio/helpers.py b/minio/helpers.py index 8112d5f2..bd8a4071 100644 --- a/minio/helpers.py +++ b/minio/helpers.py @@ -55,17 +55,6 @@ MIN_PART_SIZE = 5 * 1024 * 1024 # 5MiB DEFAULT_PART_SIZE = MIN_PART_SIZE # Currently its 5MiB - -# Select Object Content -READ_SIZE_SELECT = 32 * 1024 # Buffer size -SQL = 'SQL' # Value for ExpressionType -EVENT_RECORDS = 'Records' # Event Type is Records -EVENT_PROGRESS = 'Progress' # Event Type Progress -EVENT_STATS = 'Stats' # Event Type Stats -EVENT = 'event' # Message Type is event -EVENT_END = 'End' # Event Type is End -ERROR = 'error' # Message Type is error - _VALID_BUCKETNAME_REGEX = re.compile('^[a-z0-9][a-z0-9\\.\\-]+[a-z0-9]$') _ALLOWED_HOSTNAME_REGEX = re.compile( '^((?!-)(?!_)[A-Z_\\d-]{1,63}(? read_buffer: - len_read = len(chunked_message[total_byte_parsed:]) - message = chunked_message[total_byte_parsed:] + \ - self.response.read(byte_int(total_byte_length)-len_read) - end_status = self.__decode_message(message, rec) - total_byte_parsed += byte_int(total_byte_length) - # Case 3- the complete message is present in chunked - # messsage. - else: - message = chunked_message[total_byte_parsed: - total_byte_parsed + - byte_int(total_byte_length)] - total_byte_parsed += byte_int(total_byte_length) - end_status = self.__decode_message(message, rec) - if end_status: - break - return rec - - def __extract_header(self, header, header_length): - """ - populates the header map after reading the header - """ - header_map = {} - header_byte_parsed = 0 - # While loop ends when all the headers present are read - # header contains multipe headers - while header_byte_parsed < header_length: - header_name_byte_length = \ - byte_int(header[header_byte_parsed: header_byte_parsed+1]) - header_byte_parsed += 1 - header_name = \ - header[header_byte_parsed: - header_byte_parsed+header_name_byte_length] - header_byte_parsed += header_name_byte_length - # Header Value Type is of 1 bytes and is skipped - header_byte_parsed += 1 - value_string_byte_length = \ - byte_int(header[header_byte_parsed: - header_byte_parsed+2]) - header_byte_parsed += 2 - header_value = \ - header[header_byte_parsed: - header_byte_parsed+value_string_byte_length] - header_byte_parsed += value_string_byte_length - header_map[header_name.decode("utf-8").lstrip(":")] = \ - header_value.decode("utf-8").lstrip(":") - return header_map - - def __read_stats(self, stats): - """ - pupulates the stat dict. - """ - root = cElementTree.fromstring(stats) - for attribute in root: - if attribute.tag == 'BytesScanned': - self.stat['BytesScanned'] = attribute.text - elif attribute.tag == 'BytesProcessed': - self.stat['BytesProcessed'] = attribute.text - elif attribute.tag == 'BytesReturned': - self.stat['BytesReturned'] = attribute.text - - def __parse_message(self, header_map, payload, payload_length, record): - ''' - Parses the message - ''' - if header_map["message-type"] == ERROR: - error = header_map["error-code"] + ":\"" +\ - header_map["error-message"] + "\"" - if header_map["message-type"] == EVENT: - # Fetch the content-type - content_type = header_map["content-type"] - # Fetch the event-type - event_type = header_map["event-type"] - if event_type == EVENT_RECORDS: - record += payload[0:payload_length] - elif event_type == EVENT_PROGRESS: - if content_type == "text/xml": - progress = payload[0:payload_length] - elif event_type == EVENT_STATS: - if content_type == "text/xml": - self.__read_stats(payload[0:payload_length]) - - def __decode_message(self, message, rec): - end_status = False - total_byte_length = message[0:4] # total_byte_length is of 4 bytes - headers_byte_length = message[4: 8] # headers_byte_length is 4 bytes - prelude_crc = message[8:12] # prelude_crc is of 4 bytes - header = message[12:12+byte_int(headers_byte_length)] - payload_length = byte_int(total_byte_length) - \ - byte_int(headers_byte_length) - int(16) - payload = message[12 + byte_int(headers_byte_length): - 12 + byte_int(headers_byte_length) + payload_length] - message_crc = message[12 + byte_int(headers_byte_length) + - payload_length: 12 + - byte_int(headers_byte_length) + - payload_length + 4] - - if not validate_crc(total_byte_length + headers_byte_length, - prelude_crc): - raise CRCValidationError( - {"Checksum Mismatch, MessageCRC of " + - str(calculate_crc(total_byte_length + - headers_byte_length)) + - " does not equal expected CRC of " + - str(byte_int(prelude_crc))}) - - if not validate_crc(message[0:len(message)-4], message_crc): - raise CRCValidationError( - {"Checksum Mismatch, MessageCRC of " + - str(calculate_crc(message)) + - " does not equal expected CRC of " + - str(byte_int(message_crc))}) - - header_map = self.__extract_header(header, byte_int(headers_byte_length)) - - if header_map["message-type"] == EVENT: - # Parse message only when event-type is Records, - # Progress, Stats. Break the loop if event type is End - # Do nothing if event type is Cont - if header_map["event-type"] == EVENT_RECORDS or \ - header_map["event-type"] == EVENT_PROGRESS or \ - header_map["event-type"] == EVENT_STATS: - self.__parse_message(header_map, payload, - payload_length, rec) - - if header_map["event-type"] == EVENT_END: - end_status = True - if header_map["message-type"] == ERROR: - self.__parse_message(header_map, payload, payload_length, rec) - end_status = True - return end_status - - def __read(self, num_bytes): - """ - extract each record from the response body ... and buffer it. - send only up to requested bytes such as message[:num_bytes] - rest is buffered and added to the next iteration. - """ - if len(self.remaining_bytes) == 0: - res = self.__extract_message() - if len(res) == 0: - return b'' - else: - self.remaining_bytes = res - - if num_bytes < len(self.remaining_bytes): - result = self.remaining_bytes[:num_bytes] - del self.remaining_bytes[:num_bytes] - return result - else: - left_in_buffer = self.remaining_bytes[:len(self.remaining_bytes)] - del self.remaining_bytes[:len(left_in_buffer)] - return left_in_buffer - - def stream(self, num_bytes): - """ - streams the response - """ - while True: - x = self.__read(num_bytes) - if x == b'': - break - elif len(x) < num_bytes: - x += self.__read(num_bytes-len(x)) - yield x.decode('utf-8') if isinstance(x, bytearray) else x diff --git a/setup.py b/setup.py index 739c4eab..dbf89422 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ packages = [ 'minio', + 'minio.select', ] requires = [ diff --git a/tests/functional/tests.py b/tests/functional/tests.py index befc1a08..d1879cf5 100644 --- a/tests/functional/tests.py +++ b/tests/functional/tests.py @@ -16,13 +16,13 @@ # limitations under the License. from __future__ import division +from __future__ import absolute_import import os import io import csv import sys -from io import BytesIO from sys import exit import uuid import shutil @@ -44,10 +44,10 @@ from minio.error import (APINotImplemented, NoSuchBucketPolicy, ResponseError, PreconditionFailed, BucketAlreadyOwnedByYou, BucketAlreadyExists, InvalidBucketError) -from minio.select_object_options import (SelectObjectOptions, CSVInput, - RequestProgress, InputSerialization, - OutputSerialization, CSVOutput) -from minio.select_object_reader import (calculate_crc) +from minio.select.options import (SelectObjectOptions, CSVInput, + RequestProgress, InputSerialization, + OutputSerialization, CSVOutput) +from minio.select.helpers import (calculate_crc) from minio.sse import SSE_C from minio.sse import copy_SSE_C @@ -291,8 +291,8 @@ def test_select_object_content(client, log_output): try: client.make_bucket(bucket_name) content = io.BytesIO(b"col1,col2,col3\none,two,three\nX,Y,Z\n") - expected_crc = calculate_crc(content.getbuffer()) - client.put_object(bucket_name, csvfile, content, content.getbuffer().nbytes) + expected_crc = calculate_crc(content.getvalue()) + client.put_object(bucket_name, csvfile, content, len(content.getvalue())) options = SelectObjectOptions( expression="select * from s3object", @@ -319,11 +319,11 @@ def test_select_object_content(client, log_output): ) data = client.select_object_content(bucket_name, csvfile, options) # Get the records - records = "" + records = io.BytesIO() for d in data.stream(10*1024): - records += d - generated_crc = calculate_crc(str.encode(records)) + records.write(d.encode('utf-8')) + generated_crc = calculate_crc(records.getvalue()) if expected_crc != generated_crc: raise ValueError('Data mismatch Expected : "col1,col2,col3\none,two,three\nX,Y,Z\n"', 'Received {}', records) @@ -2062,10 +2062,8 @@ def main(): log_output = LogOutput(client.get_bucket_notification, 'test_get_bucket_notification') test_get_bucket_notification(client, log_output) - # getBuffer() of io.BytesIO is supported in Python3. - if sys.version_info.major == 3: - log_output = LogOutput(client.select_object_content, 'test_select_object_content') - test_select_object_content(client, log_output) + log_output = LogOutput(client.select_object_content, 'test_select_object_content') + test_select_object_content(client, log_output) else: # Quick mode tests @@ -2114,10 +2112,8 @@ def main(): log_output = LogOutput(client.copy_object, 'test_copy_object_no_copy_condition') test_copy_object_no_copy_condition(client, log_output) - # getBuffer() of io.BytesIO is supported in Python3. - if sys.version_info.major == 3: - log_output = LogOutput(client.select_object_content, 'test_select_object_content') - test_select_object_content(client, log_output) + log_output = LogOutput(client.select_object_content, 'test_select_object_content') + test_select_object_content(client, log_output) if secure: log_output = LogOutput(client.copy_object, 'test_copy_object_with_sse') diff --git a/tests/unit/generate_xml_test.py b/tests/unit/generate_xml_test.py index e2791bcf..87e6802c 100644 --- a/tests/unit/generate_xml_test.py +++ b/tests/unit/generate_xml_test.py @@ -21,13 +21,12 @@ from minio.xml_marshal import (xml_marshal_bucket_constraint, xml_marshal_complete_multipart_upload, xml_marshal_select) -from minio.select_object_options import (SelectObjectOptions, - CSVInput, - RequestProgress, - InputSerialization, - OutputSerialization, - CSVOutput) - +from minio.select.options import (SelectObjectOptions, + CSVInput, + RequestProgress, + InputSerialization, + OutputSerialization, + CSVOutput) class GenerateRequestTest(TestCase): def test_generate_bucket_constraint(self):