Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/SK-1343 | Enable use of any S3 backend #809

Merged
merged 23 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions fedn/network/api/shared.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Shared objects for the network API."""

import os
from typing import Tuple

from werkzeug.security import safe_join

Expand All @@ -8,6 +11,7 @@
from fedn.network.storage.s3.base import RepositoryBase
from fedn.network.storage.s3.miniorepository import MINIORepository
from fedn.network.storage.s3.repository import Repository
from fedn.network.storage.s3.saasrepository import SAASRepository
from fedn.network.storage.statestore.stores.analytic_store import AnalyticStore
from fedn.network.storage.statestore.stores.client_store import ClientStore
from fedn.network.storage.statestore.stores.combiner_store import CombinerStore
Expand Down Expand Up @@ -38,7 +42,7 @@
analytic_store: AnalyticStore = stores.analytic_store


repository = Repository(modelstorage_config["storage_config"])
repository = Repository(modelstorage_config["storage_config"], storage_type=modelstorage_config["storage_type"])

control = Control(
network_id=network_id,
Expand All @@ -54,11 +58,17 @@
# TODO: use Repository
minio_repository: RepositoryBase = None

if modelstorage_config["storage_type"] == "S3":
storage_type = os.environ.get("FEDN_STORAGE_TYPE", modelstorage_config["storage_type"])
if storage_type == "MINIO":
minio_repository = MINIORepository(modelstorage_config["storage_config"])
elif storage_type == "SAAS":
minio_repository = SAASRepository(modelstorage_config["storage_config"])
else:
minio_repository = MINIORepository(modelstorage_config["storage_config"])


def get_checksum(name: str = None):
def get_checksum(name: str = None) -> Tuple[bool, str, str]:
"""Generate a checksum for a given file."""
message = None
sum = None
success = False
Expand Down
29 changes: 18 additions & 11 deletions fedn/network/storage/s3/base.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
"""Base class for artifacts repository implementations."""

import abc
from typing import IO


class RepositoryBase(object):
__metaclass__ = abc.ABCMeta
class RepositoryBase(abc.ABC):
"""Base class for artifacts repository implementations."""

@abc.abstractmethod
def set_artifact(self, instance_name, instance, bucket):
"""Set object with name object_name
def set_artifact(self, instance_name: str, instance: IO, bucket: str) -> None:
"""Set object with name instance_name.

:param instance_name: The name of the object
:tyep insance_name: str
:param instance: the object
:type instance_name: str
:param instance: The object
:type instance: Any
:param bucket: The bucket name
:type bucket: str
"""
raise NotImplementedError("Must be implemented by subclass")

@abc.abstractmethod
def get_artifact(self, instance_name, bucket):
"""Retrive object with name instance_name.
def get_artifact(self, instance_name: str, bucket: str) -> IO:
"""Retrieve object with name instance_name.

:param instance_name: The name of the object to retrieve
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
:return: The retrieved object
:rtype: Any
"""
raise NotImplementedError("Must be implemented by subclass")

@abc.abstractmethod
def get_artifact_stream(self, instance_name, bucket):
def get_artifact_stream(self, instance_name: str, bucket: str) -> IO:
"""Return a stream handler for object with name instance_name.

:param instance_name: The name if the object
:param instance_name: The name of the object
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
:return: stream handler for object instance name
:return: Stream handler for object instance_name
:rtype: IO
"""
raise NotImplementedError("Must be implemented by subclass")
91 changes: 65 additions & 26 deletions fedn/network/storage/s3/miniorepository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
"""Module implementing Repository for MinIO."""

import io
from typing import IO, List

from minio import Minio
from minio.error import InvalidResponseError
Expand All @@ -13,7 +16,7 @@ class MINIORepository(RepositoryBase):

client = None

def __init__(self, config):
def __init__(self, config: dict) -> None:
"""Initialize object.

:param config: Dictionary containing configuration for credentials and bucket names.
Expand All @@ -39,73 +42,109 @@ def __init__(self, config):
secure=config["storage_secure_mode"],
)

def set_artifact(self, instance_name, instance, bucket, is_file=False):
if is_file:
self.client.fput_object(bucket, instance_name, instance)
else:
try:
def set_artifact(self, instance_name: str, instance: IO, bucket: str, is_file: bool = False) -> bool:
"""Set object with name instance_name.

:param instance_name: The name of the object
:type instance_name: str
:param instance: The object
:type instance: Any
:param bucket: The bucket name
:type bucket: str
:param is_file: Whether the instance is a file, defaults to False
:type is_file: bool, optional
:return: True if the artifact was set successfully
:rtype: bool
"""
try:
if is_file:
self.client.fput_object(bucket, instance_name, instance)
else:
self.client.put_object(bucket, instance_name, io.BytesIO(instance), len(instance))
except Exception as e:
raise Exception("Could not load data into bytes {}".format(e))
except Exception as e:
logger.error(f"Failed to upload artifact: {instance_name} to bucket: {bucket}. Error: {e}")
raise Exception(f"Could not load data into bytes: {e}") from e

return True

def get_artifact(self, instance_name, bucket):
def get_artifact(self, instance_name: str, bucket: str) -> bytes:
"""Retrieve object with name instance_name.

:param instance_name: The name of the object to retrieve
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
:return: The retrieved object
:rtype: bytes
"""
try:
data = self.client.get_object(bucket, instance_name)
return data.read()
except Exception as e:
raise Exception("Could not fetch data from bucket, {}".format(e))
logger.error(f"Failed to fetch artifact: {instance_name} from bucket: {bucket}. Error: {e}")
raise Exception(f"Could not fetch data from bucket: {e}") from e
finally:
data.close()
data.release_conn()

def get_artifact_stream(self, instance_name, bucket):
def get_artifact_stream(self, instance_name: str, bucket: str) -> io.BytesIO:
"""Return a stream handler for object with name instance_name.

:param instance_name: The name of the object
:type instance_name: str
:param bucket: The bucket name
:type bucket: str
:return: Stream handler for object instance_name
:rtype: io.BytesIO
"""
try:
data = self.client.get_object(bucket, instance_name)
return data
return self.client.get_object(bucket, instance_name)
except Exception as e:
raise Exception("Could not fetch data from bucket, {}".format(e))
logger.error(f"Failed to fetch artifact stream: {instance_name} from bucket: {bucket}. Error: {e}")
raise Exception(f"Could not fetch data from bucket: {e}") from e

def list_artifacts(self, bucket):
def list_artifacts(self, bucket: str) -> List[str]:
"""List all objects in bucket.

:param bucket: Name of the bucket
:type bucket: str
:return: A list of object names
:rtype: List[str]
"""
objects = []
try:
objs = self.client.list_objects(bucket)
for obj in objs:
objects.append(obj.object_name)
except Exception:
raise Exception("Could not list models in bucket {}".format(bucket))
except Exception as err:
logger.error(f"Failed to list artifacts in bucket: {bucket}. Error: {err}")
raise Exception(f"Could not list models in bucket: {bucket}") from err
return objects

def delete_artifact(self, instance_name, bucket):
def delete_artifact(self, instance_name: str, bucket: str) -> None:
"""Delete object with name instance_name from buckets.

:param instance_name: The object name
:type instance_name: str
:param bucket: Buckets to delete from
:type bucket: str
"""
try:
self.client.remove_object(bucket, instance_name)
except InvalidResponseError as err:
logger.error("Could not delete artifact: {0} err: {1}".format(instance_name, err))
pass
logger.error(f"Could not delete artifact: {instance_name}. Error: {err}")

def create_bucket(self, bucket_name):
def create_bucket(self, bucket_name: str) -> None:
"""Create a new bucket. If bucket exists, do nothing.

:param bucket_name: The name of the bucket
:type bucket_name: str
"""
found = self.client.bucket_exists(bucket_name)
logger.info(f"Creating bucket: {bucket_name}")

if not found:
try:
try:
if not self.client.bucket_exists(bucket_name):
self.client.make_bucket(bucket_name)
except InvalidResponseError:
raise
except InvalidResponseError as err:
logger.error(f"Failed to create bucket: {bucket_name}. Error: {err}")
raise
Loading
Loading