From 3ed58a959146d5d22d422b967a4642fc0d463813 Mon Sep 17 00:00:00 2001 From: Wilfried BARADAT Date: Tue, 4 Apr 2023 19:32:19 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backends)=20add=20s3=20backend=20with?= =?UTF-8?q?=20unified=20interface?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add S3 backend under the new common `data` interface --- .circleci/config.yml | 2 +- Dockerfile | 2 +- setup.cfg | 1 + src/ralph/backends/data/s3.py | 388 +++++++++++++++++++ tests/backends/data/test_s3.py | 658 +++++++++++++++++++++++++++++++++ tests/conftest.py | 1 + tests/fixtures/backends.py | 22 ++ 7 files changed, 1072 insertions(+), 2 deletions(-) create mode 100644 src/ralph/backends/data/s3.py create mode 100644 tests/backends/data/test_s3.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 26b9ea32a..763dd16c6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -144,7 +144,7 @@ jobs: - v1-dependencies-<< parameters.python-image >>-{{ .Revision }} - run: name: Install development dependencies - command: pip install --user .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-swift,backend-ws,cli,dev,lrs] + command: pip install --user .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-s3,backend-swift,backend-ws,cli,dev,lrs] - save_cache: paths: - ~/.local diff --git a/Dockerfile b/Dockerfile index 5c84878c1..044edc092 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ RUN apt-get update && \ libffi-dev && \ rm -rf /var/lib/apt/lists/* -RUN pip install .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-swift,backend-ws,cli,lrs] +RUN pip install .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-s3,backend-swift,backend-ws,cli,lrs] # -- Core -- diff --git a/setup.cfg b/setup.cfg index 7753dbb3f..99d7df50e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -56,6 +56,7 @@ backend-mongo = backend-s3 = boto3>=1.24.70 botocore>=1.27.71 + requests-toolbelt>=1.0.0 backend-swift = python-keystoneclient>=5.0.0 python-swiftclient>=4.0.0 diff --git a/src/ralph/backends/data/s3.py b/src/ralph/backends/data/s3.py new file mode 100644 index 000000000..3222b41ad --- /dev/null +++ b/src/ralph/backends/data/s3.py @@ -0,0 +1,388 @@ +"""S3 data backend for Ralph.""" + +import json +import logging +from io import IOBase +from itertools import chain +from typing import Iterable, Iterator, Union +from uuid import uuid4 + +import boto3 +from boto3.s3.transfer import TransferConfig +from botocore.exceptions import ( + ClientError, + ParamValidationError, + ReadTimeoutError, + ResponseStreamingError, +) +from botocore.response import StreamingBody +from requests_toolbelt import StreamingIterator + +from ralph.backends.data.base import ( + BaseDataBackend, + BaseDataBackendSettings, + BaseOperationType, + BaseQuery, + DataBackendStatus, + enforce_query_checks, +) +from ralph.backends.mixins import HistoryMixin +from ralph.conf import BaseSettingsConfig +from ralph.exceptions import BackendException, BackendParameterException +from ralph.utils import now + +logger = logging.getLogger(__name__) + + +class S3DataBackendSettings(BaseDataBackendSettings): + """S3 data backend default configuration. + + Attributes: + ACCESS_KEY_ID (str): The access key id for the S3 account. + SECRET_ACCESS_KEY (str): The secret key for the S3 account. + SESSION_TOKEN (str): The session token for the S3 account. + ENDPOINT_URL (str): The endpoint URL of the S3. + DEFAULT_REGION (str): The default region used in instantiating the client. + DEFAULT_BUCKET_NAME (str): The default bucket name targeted. + DEFAULT_CHUNK_SIZE (str): The default chunk size for reading and writing + objects. + LOCALE_ENCODING (str): The encoding used for writing dictionaries to objects. + """ + + class Config(BaseSettingsConfig): + """Pydantic Configuration.""" + + env_prefix = "RALPH_BACKENDS__DATA__S3__" + + ACCESS_KEY_ID: str = None + SECRET_ACCESS_KEY: str = None + SESSION_TOKEN: str = None + ENDPOINT_URL: str = None + DEFAULT_REGION: str = None + DEFAULT_BUCKET_NAME: str = None + DEFAULT_CHUNK_SIZE: int = 4096 + LOCALE_ENCODING: str = "utf8" + + +class S3DataBackend(HistoryMixin, BaseDataBackend): + """S3 data backend.""" + + name = "s3" + default_operation_type = BaseOperationType.CREATE + settings_class = S3DataBackendSettings + + def __init__(self, settings: settings_class = None): + """Instantiate the AWS S3 client.""" + self.settings = settings if settings else self.settings_class() + + self.default_bucket_name = self.settings.DEFAULT_BUCKET_NAME + self.default_chunk_size = self.settings.DEFAULT_CHUNK_SIZE + self.locale_encoding = self.settings.LOCALE_ENCODING + self._client = None + + @property + def client(self): + """Create a boto3 client if it doesn't exist.""" + if not self._client: + self._client = boto3.client( + "s3", + aws_access_key_id=self.settings.ACCESS_KEY_ID, + aws_secret_access_key=self.settings.SECRET_ACCESS_KEY, + aws_session_token=self.settings.SESSION_TOKEN, + region_name=self.settings.DEFAULT_REGION, + endpoint_url=self.settings.ENDPOINT_URL, + ) + return self._client + + def status(self) -> DataBackendStatus: + """Implement data backend checks (e.g. connection, cluster status). + + Return: + DataBackendStatus: The status of the data backend. + """ + try: + self.client.head_bucket(Bucket=self.default_bucket_name) + except ClientError: + return DataBackendStatus.ERROR + + return DataBackendStatus.OK + + def list( + self, target: str = None, details: bool = False, new: bool = False + ) -> Iterator[Union[str, dict]]: + """List objects for the target bucket. + + Args: + target (str or None): The target bucket to list from. + If target is `None`, the `default_bucket_name` is used instead. + details (bool): Get detailed object information instead of just object name. + new (bool): Given the history, list only unread files. + + Yields: + str: The next object name. (If details is False). + dict: The next object details. (If details is True). + + Raises: + BackendException: If a failure occurs. + """ + if target is None: + target = self.default_bucket_name + + objects_to_skip = set() + if new: + objects_to_skip = set(self.get_command_history(self.name, "read")) + + try: + paginator = self.client.get_paginator("list_objects_v2") + page_iterator = paginator.paginate(Bucket=target) + for objects in page_iterator: + if "Contents" not in objects: + continue + for obj in objects["Contents"]: + if new and f"{target}/{obj['Key']}" in objects_to_skip: + continue + if details: + obj["LastModified"] = obj["LastModified"].isoformat() + yield obj + else: + yield obj["Key"] + except ClientError as err: + error_msg = err.response["Error"]["Message"] + msg = "Failed to list the bucket %s: %s" + logger.error(msg, target, error_msg) + raise BackendException(msg % (target, error_msg)) from err + + @enforce_query_checks + def read( + self, + *, + query: Union[str, BaseQuery] = None, + target: str = None, + chunk_size: Union[None, int] = None, + raw_output: bool = False, + ignore_errors: bool = False, + ) -> Iterator[Union[bytes, dict]]: + """Read an object matching the `query` in the `target` bucket and yields it. + + Args: + query: (str or BaseQuery): The ID of the object to read. + target (str or None): The target bucket containing the objects. + If target is `None`, the `default_bucket` is used instead. + chunk_size (int or None): The chunk size for reading objects. + raw_output (bool): Controls whether to yield bytes or dictionaries. + ignore_errors (bool): If `True`, errors during the read operation + will be ignored and logged. If `False` (default), a `BackendException` + will be raised if an error occurs. + + Yields: + dict: If `raw_output` is False. + bytes: If `raw_output` is True. + + Raises: + BackendException: If a failure during the read operation occurs and + `ignore_errors` is set to `False`. + BackendParameterException: If a backend argument value is not valid and + `ignore_errors` is set to `False`. + """ + if query.query_string is None: + msg = "Invalid query. The query should be a valid object name." + logger.error(msg) + raise BackendParameterException(msg) + + if not chunk_size: + chunk_size = self.default_chunk_size + + if target is None: + target = self.default_bucket_name + + try: + response = self.client.get_object(Bucket=target, Key=query.query_string) + except ClientError as err: + error_msg = err.response["Error"]["Message"] + msg = "Failed to download %s: %s" + logger.error(msg, query.query_string, error_msg) + if not ignore_errors: + raise BackendException(msg % (query.query_string, error_msg)) from err + + reader = self._read_raw if raw_output else self._read_dict + try: + for chunk in reader(response["Body"], chunk_size, ignore_errors): + yield chunk + except (ReadTimeoutError, ResponseStreamingError) as err: + msg = "Failed to read chunk from object %s" + logger.error(msg, query.query_string) + if not ignore_errors: + raise BackendException(msg % (query.query_string)) from err + + # Archive fetched, add a new entry to the history. + self.append_to_history( + { + "backend": self.name, + "action": "read", + "id": target + "/" + query.query_string, + "size": response["ContentLength"], + "timestamp": now(), + } + ) + + def write( # pylint: disable=too-many-arguments + self, + data: Union[IOBase, Iterable[bytes], Iterable[dict]], + target: Union[None, str] = None, + chunk_size: Union[None, int] = None, + ignore_errors: bool = False, + operation_type: Union[None, BaseOperationType] = None, + ) -> int: + """Write `data` records to the `target` bucket and return their count. + + Args: + data: (Iterable or IOBase): The data to write. + target (str or None): The target bucket and the target object + separated by a `/`. + If target is `None`, the default bucket is used and a random + (uuid4) object is created. + If target does not contain a `/`, it is assumed to be the + target object and the default bucket is used. + chunk_size (int or None): Ignored. + ignore_errors (bool): If `True`, errors during the write operation + are ignored and logged. If `False` (default), a `BackendException` + is raised if an error occurs. + operation_type (BaseOperationType or None): The mode of the write + operation. + If operation_type is `CREATE` or `INDEX`, the target object is + expected to be absent. If the target object exists a + `BackendException` is raised. + + Return: + int: The number of written objects. + + Raise: + BackendException: If a failure during the write operation occurs. + BackendParameterException: If a backend argument value is not valid. + """ + data = iter(data) + try: + first_record = next(data) + except StopIteration: + logger.info("Data Iterator is empty; skipping write to target.") + return 0 + + if not operation_type: + operation_type = self.default_operation_type + + if not target: + target = f"{self.default_bucket_name}/{now()}-{uuid4()}" + logger.info( + "Target not specified; using default bucket with random file name: %s", + target, + ) + + elif "/" not in target: + target = f"{self.default_bucket_name}/{target}" + logger.info( + "Target not specified; using default bucket: %s", + target, + ) + + target_bucket, target_object = target.split("/", 1) + + if operation_type in [ + BaseOperationType.APPEND, + BaseOperationType.DELETE, + BaseOperationType.UPDATE, + ]: + msg = "%s operation_type is not allowed." + logger.error(msg, operation_type.name) + raise BackendParameterException(msg % operation_type.name) + + if target_object in list(self.list(target=target_bucket)): + msg = "%s already exists and overwrite is not allowed for operation %s" + logger.error(msg, target_object, operation_type) + raise BackendException(msg % (target_object, operation_type)) + + logger.info("Creating archive: %s", target_object) + + data = chain((first_record,), data) + if isinstance(first_record, dict): + data = self._parse_dict_to_bytes(data, ignore_errors) + + counter = {"count": 0} + data = self._count(data, counter) + + # Using StreamingIterator from requests-toolbelt but without specifying a size + # as we will not use it. It implements the `read` method for iterators. + data = StreamingIterator(0, data) + + try: + self.client.upload_fileobj( + Bucket=target_bucket, + Key=target_object, + Fileobj=data, + Config=TransferConfig(multipart_chunksize=chunk_size), + ) + response = self.client.head_object(Bucket=target_bucket, Key=target_object) + except (ClientError, ParamValidationError) as exc: + msg = "Failed to upload %s" + logger.error(msg, target) + raise BackendException(msg % target) from exc + + # Archive written, add a new entry to the history + self.append_to_history( + { + "backend": self.name, + "action": "write", + "operation_type": operation_type.value, + "id": target, + "size": response["ContentLength"], + "timestamp": now(), + } + ) + + return counter["count"] + + @staticmethod + def _read_raw( + obj: StreamingBody, chunk_size: int, _ignore_errors: bool + ) -> Iterator[bytes]: + """Read the `object` in chunks of size `chunk_size` and yield them.""" + for chunk in obj.iter_chunks(chunk_size): + yield chunk + + @staticmethod + def _read_dict( + obj: StreamingBody, chunk_size: int, ignore_errors: bool + ) -> Iterator[dict]: + """Read the `object` by line and yield JSON parsed dictionaries.""" + for line in obj.iter_lines(chunk_size): + try: + yield json.loads(line) + except (TypeError, json.JSONDecodeError) as err: + msg = "Raised error: %s" + logger.error(msg, err) + if not ignore_errors: + raise BackendException(msg % err) from err + + @staticmethod + def _parse_dict_to_bytes( + statements: Iterable[dict], ignore_errors: bool + ) -> Iterator[bytes]: + """Read the `statements` Iterable and yield bytes.""" + for statement in statements: + try: + yield bytes(f"{json.dumps(statement)}\n", encoding="utf-8") + except TypeError as error: + msg = "Failed to encode JSON: %s, for document %s" + logger.error(msg, error, statement) + if ignore_errors: + continue + raise BackendException(msg % (error, statement)) from error + + @staticmethod + def _count( + statements: Union[Iterable[bytes], Iterable[dict]], + counter: dict, + ) -> Iterator: + """Count the elements in the `statements` Iterable and yield element.""" + for statement in statements: + counter["count"] += 1 + yield statement diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py new file mode 100644 index 000000000..d8bfc4a3a --- /dev/null +++ b/tests/backends/data/test_s3.py @@ -0,0 +1,658 @@ +"""Tests for Ralph S3 data backend.""" + +import datetime +import json +import logging + +import boto3 +import pytest +from botocore.exceptions import ClientError, ResponseStreamingError +from moto import mock_s3 + +from ralph.backends.data.base import BaseOperationType, BaseQuery, DataBackendStatus +from ralph.backends.data.s3 import S3DataBackend, S3DataBackendSettings +from ralph.exceptions import BackendException, BackendParameterException + + +def test_backends_data_s3_backend_default_instantiation( + monkeypatch, fs +): # pylint: disable=invalid-name + """Test the `S3DataBackend` default instantiation.""" + fs.create_file(".env") + backend_settings_names = [ + "ACCESS_KEY_ID", + "SECRET_ACCESS_KEY", + "SESSION_TOKEN", + "ENDPOINT_URL", + "DEFAULT_REGION", + "DEFAULT_BUCKET_NAME", + "DEFAULT_CHUNK_SIZE", + "LOCALE_ENCODING", + ] + for name in backend_settings_names: + monkeypatch.delenv(f"RALPH_BACKENDS__DATA__S3__{name}", raising=False) + + assert S3DataBackend.name == "s3" + assert S3DataBackend.query_model == BaseQuery + assert S3DataBackend.default_operation_type == BaseOperationType.CREATE + assert S3DataBackend.settings_class == S3DataBackendSettings + backend = S3DataBackend() + assert backend.default_bucket_name is None + assert backend.default_chunk_size == 4096 + assert backend.locale_encoding == "utf8" + + +def test_backends_data_s3_data_backend_instantiation_with_settings(): + """Test the `S3DataBackend` instantiation with settings.""" + settings_ = S3DataBackend.settings_class( + ACCESS_KEY_ID="access_key", + SECRET_ACCESS_KEY="secret", + SESSION_TOKEN="session_token", + ENDPOINT_URL="http://endpoint/url", + DEFAULT_REGION="us-west-2", + DEFAULT_BUCKET_NAME="bucket", + DEFAULT_CHUNK_SIZE=1000, + LOCALE_ENCODING="utf-16", + ) + backend = S3DataBackend(settings_) + assert backend.default_bucket_name == "bucket" + assert backend.default_chunk_size == 1000 + assert backend.locale_encoding == "utf-16" + + try: + S3DataBackend(settings_) + except Exception as err: # pylint:disable=broad-except + pytest.fail(f"S3DataBackend should not raise exceptions: {err}") + + +@mock_s3 +def test_backends_data_s3_data_backend_status_method(s3_backend): + """Test the `S3DataBackend.status` method.""" + + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + + assert s3_backend().status() == DataBackendStatus.ERROR + + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + assert s3_backend().status() == DataBackendStatus.OK + + +@mock_s3 +def test_backends_data_s3_data_backend_list_should_yield_archive_names( + s3_backend, +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.list` method successfully connects to the S3 + data, the S3 backend list method should yield the archives. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-04-29.gz", + Body=json.dumps({"id": "1", "foo": "bar"}), + ) + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-04-30.gz", + Body=json.dumps({"id": "2", "some": "data"}), + ) + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-10-01.gz", + Body=json.dumps({"id": "3", "other": "info"}), + ) + + listing = [ + {"name": "2022-04-29.gz"}, + {"name": "2022-04-30.gz"}, + {"name": "2022-10-01.gz"}, + ] + + s3 = s3_backend() + + s3.history.extend( + [ + {"id": "bucket_name/2022-04-29.gz", "backend": "s3", "command": "read"}, + {"id": "bucket_name/2022-04-30.gz", "backend": "s3", "command": "read"}, + ] + ) + + try: + response_list = s3.list() + response_list_new = s3.list(new=True) + response_list_details = s3.list(details=True) + except Exception: # pylint:disable=broad-except + pytest.fail("S3 backend should not raise exception on successful list") + + assert list(response_list) == [x["name"] for x in listing] + assert list(response_list_new) == ["2022-10-01.gz"] + assert [x["Key"] for x in response_list_details] == [x["name"] for x in listing] + + +@mock_s3 +def test_backends_data_s3_list_on_empty_bucket_should_do_nothing( + s3_backend, +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.list` method successfully connects to the S3 + data, the S3 backend list method on an empty bucket should do nothing. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + listing = [] + + s3 = s3_backend() + + s3.clean_history(lambda *_: True) + try: + response_list = s3.list() + except Exception: # pylint:disable=broad-except + pytest.fail("S3 backend should not raise exception on successful list") + + assert list(response_list) == [x["name"] for x in listing] + + +@mock_s3 +def test_backends_data_s3_list_with_failed_connection_should_log_the_error( + s3_backend, caplog +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.list` method fails to retrieve the list of + archives, the S3 backend list method should log the error and raise a + BackendException. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-04-29.gz", + Body=json.dumps({"id": "1", "foo": "bar"}), + ) + + s3 = s3_backend() + + s3.clean_history(lambda *_: True) + + msg = "Failed to list the bucket wrong_name: The specified bucket does not exist" + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendException, match=msg): + next(s3.list(target="wrong_name")) + with pytest.raises(BackendException, match=msg): + next(s3.list(target="wrong_name", new=True)) + with pytest.raises(BackendException, match=msg): + next(s3.list(target="wrong_name", details=True)) + + assert ( + list( + filter( + lambda record: record[1] == logging.ERROR, + caplog.record_tuples, + ) + ) + == [("ralph.backends.data.s3", logging.ERROR, msg)] * 3 + ) + + +@mock_s3 +def test_backends_data_s3_read_with_valid_name_should_write_to_history( + s3_backend, + monkeypatch, +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.list` method successfully retrieves from the + S3 data the object with the provided name (the object exists), + the S3 backend read method should write the entry to the history. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + raw_body = b"some contents in the body" + json_body = '{"id":"foo"}' + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-29.gz", + Body=raw_body, + ) + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-30.gz", + Body=json_body, + ) + + freezed_now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + monkeypatch.setattr("ralph.backends.data.s3.now", lambda: freezed_now) + + s3 = s3_backend() + s3.clean_history(lambda *_: True) + + list( + s3.read( + query="2022-09-29.gz", + target=bucket_name, + chunk_size=1000, + raw_output=True, + ) + ) + + assert { + "backend": "s3", + "action": "read", + "id": f"{bucket_name}/2022-09-29.gz", + "size": len(raw_body), + "timestamp": freezed_now, + } in s3.history + + list( + s3.read( + query="2022-09-30.gz", + raw_output=False, + ) + ) + + assert { + "backend": "s3", + "action": "read", + "id": f"{bucket_name}/2022-09-30.gz", + "size": len(json_body), + "timestamp": freezed_now, + } in s3.history + + +@mock_s3 +def test_backends_data_s3_read_with_invalid_output_should_log_the_error( + s3_backend, caplog +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.read` method fails to serialize the object, the + S3 backend read method should log the error, not write to history and raise a + BackendException. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + body = b"some contents in the body" + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-29.gz", + Body=body, + ) + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendException): + s3 = s3_backend() + list(s3.read(query="2022-09-29.gz", raw_output=False)) + + assert ( + "ralph.backends.data.s3", + logging.ERROR, + "Raised error: Expecting value: line 1 column 1 (char 0)", + ) in caplog.record_tuples + + s3.clean_history(lambda *_: True) + + +@mock_s3 +def test_backends_data_s3_read_with_invalid_name_should_log_the_error( + s3_backend, caplog +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.read` method fails to retrieve from the S3 + data the object with the provided name (the object does not exists on S3), + the S3 backend read method should log the error, not write to history and raise a + BackendException. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + body = b"some contents in the body" + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-29.gz", + Body=body, + ) + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendParameterException): + s3 = s3_backend() + list(s3.read(query=None, target=bucket_name)) + + assert ( + "ralph.backends.data.s3", + logging.ERROR, + "Invalid query. The query should be a valid object name.", + ) in caplog.record_tuples + + s3.clean_history(lambda *_: True) + + +@mock_s3 +def test_backends_data_s3_read_with_wrong_name_should_log_the_error( + s3_backend, caplog +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.read` method fails to retrieve from the S3 + data the object with the provided name (the object does not exists on S3), + the S3 backend read method should log the error, not write to history and raise a + BackendException. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + body = b"some contents in the body" + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-29.gz", + Body=body, + ) + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendException): + s3 = s3_backend() + s3.clean_history(lambda *_: True) + list(s3.read(query="invalid_name.gz", target=bucket_name)) + + assert ( + "ralph.backends.data.s3", + logging.ERROR, + "Failed to download invalid_name.gz: The specified key does not exist.", + ) in caplog.record_tuples + + assert s3.history == [] + + +@mock_s3 +def test_backends_data_s3_read_with_iter_error_should_log_the_error( + s3_backend, caplog, monkeypatch +): # pylint: disable=invalid-name + """Test that given `S3DataBackend.read` method fails to iterate through the result + from the S3 data the object, the S3 backend read method should log the error, + not write to history and raise a BackendException. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + body = b"some contents in the body" + + object_name = "2022-09-29.gz" + + s3_client.put_object( + Bucket=bucket_name, + Key=object_name, + Body=body, + ) + + def mock_read_raw(*args, **kwargs): + raise ResponseStreamingError(error="error") + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendException): + s3 = s3_backend() + monkeypatch.setattr(s3, "_read_raw", mock_read_raw) + s3.clean_history(lambda *_: True) + list(s3.read(query=object_name, target=bucket_name, raw_output=True)) + + assert ( + "ralph.backends.data.s3", + logging.ERROR, + f"Failed to read chunk from object {object_name}", + ) in caplog.record_tuples + assert s3.history == [] + + +@pytest.mark.parametrize( + "operation_type", + [None, BaseOperationType.CREATE, BaseOperationType.INDEX], +) +@mock_s3 +def test_backends_data_s3_write_method_with_parameter_error( + operation_type, s3_backend, caplog +): # pylint: disable=invalid-name + """Test the `S3DataBackend.write` method, given a target matching an + existing object and a `CREATE` or `INDEX` `operation_type`, should raise a + `FileExistsError`. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + body = b"some contents in the body" + + s3_client.put_object( + Bucket=bucket_name, + Key="2022-09-29.gz", + Body=body, + ) + + object_name = "2022-09-29.gz" + some_content = b"some contents in the stream file to upload" + + with caplog.at_level(logging.ERROR): + with pytest.raises(BackendException): + s3 = s3_backend() + s3.clean_history(lambda *_: True) + s3.write( + data=some_content, target=object_name, operation_type=operation_type + ) + + msg = ( + f"{object_name} already exists and overwrite is not allowed for operation" + f" {operation_type if operation_type is not None else BaseOperationType.CREATE}" + ) + + assert ("ralph.backends.data.s3", logging.ERROR, msg) in caplog.record_tuples + assert s3.history == [] + + +@pytest.mark.parametrize( + "operation_type", + [BaseOperationType.APPEND, BaseOperationType.DELETE], +) +def test_backends_data_s3_data_backend_write_method_with_append_or_delete_operation( + s3_backend, operation_type +): + """Test the `S3DataBackend.write` method, given an `APPEND` + `operation_type`, should raise a `BackendParameterException`. + """ + # pylint: disable=invalid-name + backend = s3_backend() + with pytest.raises( + BackendParameterException, + match=f"{operation_type.name} operation_type is not allowed.", + ): + backend.write(data=[b"foo"], operation_type=operation_type) + + +@pytest.mark.parametrize( + "operation_type", + [BaseOperationType.CREATE, BaseOperationType.INDEX], +) +@mock_s3 +def test_backends_data_s3_write_method_with_create_index_operation( + operation_type, s3_backend, monkeypatch, caplog +): # pylint: disable=invalid-name + """Test the `S3DataBackend.write` method, given a target matching an + existing object and a `CREATE` or `INDEX` `operation_type`, should add + an entry to the History. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + freezed_now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + monkeypatch.setattr("ralph.backends.data.s3.now", lambda: freezed_now) + + object_name = "new-archive.gz" + some_content = b"some contents in the stream file to upload" + data = [some_content, some_content, some_content] + s3 = s3_backend() + s3.clean_history(lambda *_: True) + + response = s3.write( + data=data, + target=object_name, + operation_type=operation_type, + ) + + assert response == 3 + assert { + "backend": "s3", + "action": "write", + "operation_type": operation_type.value, + "id": f"{bucket_name}/{object_name}", + "size": len(some_content) * 3, + "timestamp": freezed_now, + } in s3.history + + object_name = "new-archive2.gz" + other_content = {"some": "content"} + + data = [other_content, other_content] + response = s3.write( + data=data, + target=object_name, + operation_type=operation_type, + ) + + assert response == 2 + assert { + "backend": "s3", + "action": "write", + "operation_type": operation_type.value, + "id": f"{bucket_name}/{object_name}", + "size": len(bytes(f"{json.dumps(other_content)}\n", encoding="utf8")) * 2, + "timestamp": freezed_now, + } in s3.history + + assert list(s3.read(query=object_name, raw_output=False)) == data + + object_name = "new-archive3.gz" + date = datetime.datetime(2023, 6, 30, 8, 42, 15, 554892) + + data = [{"some": "content", "datetime": date}] + + error = "Object of type datetime is not JSON serializable" + + with caplog.at_level(logging.ERROR): + # Without ignoring error + with pytest.raises(BackendException, match=error): + response = s3.write( + data=data, + target=object_name, + operation_type=operation_type, + ignore_errors=False, + ) + + # Ignoring error + response = s3.write( + data=data, + target=object_name, + operation_type=operation_type, + ignore_errors=True, + ) + + assert list( + filter( + lambda record: record[1] == logging.ERROR, + caplog.record_tuples, + ) + ) == ( + [ + ( + "ralph.backends.data.s3", + logging.ERROR, + f"Failed to encode JSON: {error}, for document {data[0]}", + ) + ] + * 2 + ) + + +@mock_s3 +def test_backends_data_s3_write_method_with_no_data_should_skip( + s3_backend, +): # pylint: disable=invalid-name + """Test the `S3DataBackend.write` method, given no data to write, + should skip and return 0. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + object_name = "new-archive.gz" + + s3 = s3_backend() + response = s3.write( + data=[], + target=object_name, + operation_type=BaseOperationType.CREATE, + ) + assert response == 0 + + +@mock_s3 +def test_backends_data_s3_write_method_with_failure_should_log_the_error( + s3_backend, +): # pylint: disable=invalid-name + """Test the `S3DataBackend.write` method, given a connection failure, + should raise a `BackendException`. + """ + # Regions outside of us-east-1 require the appropriate LocationConstraint + s3_client = boto3.client("s3", region_name="us-east-1") + # Create a valid bucket in Moto's 'virtual' AWS account + bucket_name = "bucket_name" + s3_client.create_bucket(Bucket=bucket_name) + + object_name = "new-archive.gz" + body = b"some contents in the body" + error = "Failed to upload" + + def raise_client_error(*args, **kwargs): + raise ClientError({"Error": {}}, "error") + + s3 = s3_backend() + s3.client.put_object = raise_client_error + + with pytest.raises(BackendException, match=error): + s3.write( + data=[body], + target=object_name, + operation_type=BaseOperationType.CREATE, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 46d27b96c..37498ee48 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,7 @@ mongo_forwarding, moto_fs, s3, + s3_backend, settings_fs, swift, swift_backend, diff --git a/tests/fixtures/backends.py b/tests/fixtures/backends.py index 905b78995..3535f4638 100644 --- a/tests/fixtures/backends.py +++ b/tests/fixtures/backends.py @@ -23,6 +23,7 @@ from ralph.backends.data.fs import FSDataBackend, FSDataBackendSettings from ralph.backends.data.ldp import LDPDataBackend +from ralph.backends.data.s3 import S3DataBackend, S3DataBackendSettings from ralph.backends.data.swift import SwiftDataBackend, SwiftDataBackendSettings from ralph.backends.database.clickhouse import ClickHouseDatabase from ralph.backends.database.es import ESDatabase @@ -411,6 +412,27 @@ def get_s3_storage(): return get_s3_storage +@pytest.fixture +def s3_backend(): + """Return the `get_s3_data_backend` function.""" + + def get_s3_data_backend(): + """Return an instance of S3DataBackend.""" + settings = S3DataBackendSettings( + ACCESS_KEY_ID="access_key_id", + SECRET_ACCESS_KEY="secret_access_key", + SESSION_TOKEN="session_token", + ENDPOINT_URL=None, + DEFAULT_REGION="default-region", + DEFAULT_BUCKET_NAME="bucket_name", + DEFAULT_CHUNK_SIZE=4096, + LOCALE_ENCODING="utf8", + ) + return S3DataBackend(settings) + + return get_s3_data_backend + + @pytest.fixture def events(): """Returns test events fixture."""