Skip to content

Commit

Permalink
fixup! ✨(backends) add s3 backend with unified interface
Browse files Browse the repository at this point in the history
  • Loading branch information
wilbrdt committed Jun 30, 2023
1 parent 23e0b3f commit 44dd2c8
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
- v1-dependencies-<< parameters.python-image >>-{{ .Revision }}
- run:
name: Install development dependencies
command: pip install --user .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-swift,backend-ws,cli,dev,lrs]
command: pip install --user .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-s3,backend-swift,backend-ws,cli,dev,lrs]
- save_cache:
paths:
- ~/.local
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ RUN apt-get update && \
libffi-dev && \
rm -rf /var/lib/apt/lists/*

RUN pip install .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-swift,backend-ws,cli,lrs]
RUN pip install .[backend-clickhouse,backend-es,backend-ldp,backend-lrs,backend-mongo,backend-s3,backend-swift,backend-ws,cli,lrs]


# -- Core --
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ backend-mongo =
backend-s3 =
boto3>=1.24.70
botocore>=1.27.71
requests-toolbelt>=1.0.0
backend-swift =
python-keystoneclient>=5.0.0
python-swiftclient>=4.0.0
Expand Down
68 changes: 40 additions & 28 deletions src/ralph/backends/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ResponseStreamingError,
)
from botocore.response import StreamingBody
from requests_toolbelt import StreamingIterator

from ralph.backends.data.base import (
BaseDataBackend,
Expand Down Expand Up @@ -79,7 +80,7 @@ def __init__(self, settings: settings_class = None):

@property
def client(self):
"""Create a boto3 client if it doesn't exists."""
"""Create a boto3 client if it doesn't exist."""
if not self._client:
self._client = boto3.client(
"s3",
Expand Down Expand Up @@ -206,13 +207,12 @@ def read(
for chunk in reader(response["Body"], chunk_size, ignore_errors):
yield chunk
except (ReadTimeoutError, ResponseStreamingError) as err:
error_msg = err.response["Error"]["Message"]
msg = "Failed to read chunk from object %s: %s"
logger.error(msg, query.query_string, error_msg)
msg = "Failed to read chunk from object %s"
logger.error(msg, query.query_string)
if not ignore_errors:
raise BackendException(msg % (query.query_string, error_msg)) from err
raise BackendException(msg % (query.query_string)) from err

# Archive fetched, add a new entry to the historyResponseStreamingError
# Archive fetched, add a new entry to the history.
self.append_to_history(
{
"backend": self.name,
Expand Down Expand Up @@ -242,9 +242,9 @@ def write( # pylint: disable=too-many-arguments
If target does not contain a `/`, it is assumed to be the
target object and the default bucket is used.
chunk_size (int or None): Ignored.
ignore_errors (bool): If `True`, errors during the read operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
ignore_errors (bool): If `True`, errors during the write operation
are ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
operation_type (BaseOperationType or None): The mode of the write
operation.
If operation_type is `CREATE` or `INDEX`, the target object is
Expand All @@ -255,8 +255,7 @@ def write( # pylint: disable=too-many-arguments
int: The number of written objects.
Raise:
BackendException: If a failure during the write operation occurs and
`ignore_errors` is set to `False`.
BackendException: If a failure during the write operation occurs.
BackendParameterException: If a backend argument value is not valid.
"""
try:
Expand Down Expand Up @@ -298,13 +297,23 @@ def write( # pylint: disable=too-many-arguments
logger.error(msg, target_object, operation_type)
raise BackendException(msg % (target_object, operation_type))

logger.debug("Creating archive: %s", target_object)
logger.info("Creating archive: %s", target_object)

count = sum(1 for _ in data)

if isinstance(first_record, dict):
data = self._parse_dict_to_bytes(data, ignore_errors)

# Using StreamingIterator from requests-toolbelt but without specifying a size
# as we will not use it. It implements the `read` method for iterators.
data = StreamingIterator(0, iter(data))

is_dict = isinstance(first_record, dict)
writer = self._write_dict if is_dict else self._write_raw
try:
for chunk in data:
writer(target_bucket, target_object, chunk)
self.client.upload_fileobj(
Bucket=target_bucket,
Key=target_object,
Fileobj=data,
)
response = self.client.head_object(Bucket=target_bucket, Key=target_object)
except (ClientError, ParamValidationError) as exc:
msg = "Failed to upload %s"
Expand All @@ -323,7 +332,7 @@ def write( # pylint: disable=too-many-arguments
}
)

return 1
return count

@staticmethod
def _read_raw(
Expand All @@ -347,14 +356,17 @@ def _read_dict(
if not ignore_errors:
raise BackendException(msg % err) from err

def _write_raw(self, bucket: str, obj: str, data: bytes) -> None:
"""Write the `chunk` bytes to the file."""
self.client.put_object(Body=data, Bucket=bucket, Key=obj)

def _write_dict(self, bucket: str, obj: str, data: dict) -> None:
"""Write the `chunk` dictionary to the file."""
self.client.put_object(
Body=bytes(f"{json.dumps(data)}\n", encoding=self.locale_encoding),
Bucket=bucket,
Key=obj,
)
@staticmethod
def _parse_dict_to_bytes(
statements: Iterable[dict], ignore_errors: bool
) -> Iterator[bytes]:
"""Read the `statements` Iterable and yields bytes."""
for statement in statements:
try:
yield bytes(f"{json.dumps(statement)}\n", encoding="utf-8")
except TypeError as error:
msg = "Failed to encode JSON: %s, for document %s"
logger.error(msg, error, statement)
if ignore_errors:
continue
raise BackendException(msg % (error, statement)) from error
Loading

0 comments on commit 44dd2c8

Please sign in to comment.