-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a5488db
commit 0606ffa
Showing
11 changed files
with
492 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .models import fsx_backends # noqa: F401 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Exceptions raised by the fsx service.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
"""FSxBackend class with methods for supported APIs.""" | ||
|
||
from datetime import datetime | ||
from typing import Any, Dict, List, Optional, Tuple | ||
|
||
from moto.core.base_backend import BackendDict, BaseBackend | ||
from moto.core.common_models import BaseModel | ||
from moto.utilities.paginator import paginate | ||
|
||
from .utils import FileSystemType | ||
|
||
PAGINATION_MODEL = { | ||
"describe_file_systems": { | ||
"input_token": "next_token", | ||
"limit_key": "max_results", | ||
"limit_default": 2147483647, | ||
"unique_attribute": "resource_arn", | ||
} | ||
} | ||
|
||
|
||
class FileSystem(BaseModel): | ||
def __init__( | ||
self, | ||
account_id: str, | ||
region_name: str, | ||
file_system_type: str, | ||
storage_capacity: int, | ||
storage_type: str, | ||
subnet_ids: List[str], | ||
security_group_ids: List[str], | ||
tags: Optional[List[Dict[str, str]]], | ||
kms_key_id: Optional[str], | ||
windows_configuration: Optional[Dict[str, Any]], | ||
lustre_configuration: Optional[Dict[str, Any]], | ||
ontap_configuration: Optional[Dict[str, Any]], | ||
open_zfs_configuration: Optional[Dict[str, Any]], | ||
) -> None: | ||
self.file_system_id = f"fs-moto{datetime.now().strftime('%Y%m%d%H%M%S')}" | ||
self.file_system_type = file_system_type | ||
if self.file_system_type not in FileSystemType.list_values(): | ||
raise ValueError(f"Invalid FileSystemType: {self.file_system_type}") | ||
self.storage_capacity = storage_capacity | ||
self.storage_type = storage_type | ||
self.subnet_ids = subnet_ids | ||
self.security_group_ids = security_group_ids | ||
self.dns_name = f"{self.file_system_id}.fsx.{region_name}.amazonaws.com" | ||
self.kms_key_id = kms_key_id | ||
self.resource_arn = ( | ||
f"arn:aws:fsx:{region_name}:{account_id}:file-system/{self.file_system_id}" | ||
) | ||
self.tags = tags or [] | ||
self.windows_configuration = windows_configuration | ||
self.lustre_configuration = lustre_configuration | ||
self.ontap_configuration = ontap_configuration | ||
self.open_zfs_configuration = open_zfs_configuration | ||
|
||
def to_dict(self) -> Dict[str, Any]: | ||
dct = { | ||
"FileSystemId": self.file_system_id, | ||
"FileSystemType": self.file_system_type, | ||
"StorageCapacity": self.storage_capacity, | ||
"StorageType": self.storage_type, | ||
"SubnetIds": self.subnet_ids, | ||
"SecurityGroupIds": self.security_group_ids, | ||
"Tags": self.tags, | ||
"DNSName": self.dns_name, | ||
"KmsKeyId": self.kms_key_id, | ||
"ResourceARN": self.resource_arn, | ||
"WindowsConfiguration": self.windows_configuration, | ||
"LustreConfiguration": self.lustre_configuration, | ||
"OntapConfiguration": self.ontap_configuration, | ||
"OpenZFSConfiguration": self.open_zfs_configuration, | ||
} | ||
return {k: v for k, v in dct.items() if v} | ||
|
||
|
||
class FSxBackend(BaseBackend): | ||
"""Implementation of FSx APIs.""" | ||
|
||
def __init__(self, region_name: str, account_id: str) -> None: | ||
super().__init__(region_name, account_id) | ||
self.file_systems: Dict[str, FileSystem] = {} | ||
|
||
def create_file_system( | ||
self, | ||
client_request_token: str, | ||
file_system_type: str, | ||
storage_capacity: int, | ||
storage_type: str, | ||
subnet_ids: List[str], | ||
security_group_ids: List[str], | ||
tags: Optional[List[Dict[str, str]]], | ||
kms_key_id: Optional[str], | ||
windows_configuration: Optional[Dict[str, Any]], | ||
lustre_configuration: Optional[Dict[str, Any]], | ||
ontap_configuration: Optional[Dict[str, Any]], | ||
file_system_type_version: Optional[str], | ||
open_zfs_configuration: Optional[Dict[str, Any]], | ||
) -> FileSystem: | ||
file_system = FileSystem( | ||
account_id=self.account_id, | ||
region_name=self.region_name, | ||
file_system_type=file_system_type, | ||
storage_capacity=storage_capacity, | ||
storage_type=storage_type, | ||
subnet_ids=subnet_ids, | ||
security_group_ids=security_group_ids, | ||
tags=tags, | ||
kms_key_id=kms_key_id, | ||
windows_configuration=windows_configuration, | ||
ontap_configuration=ontap_configuration, | ||
open_zfs_configuration=open_zfs_configuration, | ||
lustre_configuration=lustre_configuration, | ||
) | ||
|
||
file_system_id = file_system.file_system_id | ||
|
||
self.file_systems[file_system_id] = file_system | ||
return file_system | ||
|
||
@paginate(pagination_model=PAGINATION_MODEL) | ||
def describe_file_systems(self, file_system_ids: List[str]) -> List[FileSystem]: | ||
file_systems = [] | ||
if not file_system_ids: | ||
file_systems = list(self.file_systems.values()) | ||
else: | ||
for id in file_system_ids: | ||
if id in self.file_systems: | ||
file_systems.append(self.file_systems[id]) | ||
return file_systems | ||
|
||
def delete_file_system( | ||
self, | ||
file_system_id: str, | ||
client_request_token: str, | ||
windows_configuration: Optional[Dict[str, Any]], | ||
lustre_configuration: Optional[Dict[str, Any]], | ||
open_zfs_configuration: Optional[Dict[str, Any]], | ||
) -> Tuple[ | ||
str, | ||
str, | ||
Optional[Dict[str, Any]], | ||
Optional[Dict[str, Any]], | ||
Optional[Dict[str, Any]], | ||
]: | ||
response_template = {"FinalBackUpId": "", "FinalBackUpTags": []} | ||
|
||
file_system_type = self.file_systems[file_system_id].file_system_type | ||
|
||
lifecycle = "DELETING" | ||
self.file_systems.pop(file_system_id) | ||
|
||
windows_response = None | ||
lustre_response = None | ||
open_zfs_response = None | ||
|
||
if file_system_type == "WINDOWS": | ||
windows_response = response_template | ||
elif file_system_type == "LUSTRE": | ||
lustre_response = response_template | ||
elif file_system_type == "OPEN_ZFS": | ||
open_zfs_response = response_template | ||
|
||
return ( | ||
file_system_id, | ||
lifecycle, | ||
windows_response, | ||
lustre_response, | ||
open_zfs_response, | ||
) | ||
|
||
def tag_resource(self, resource_arn: str, tags: List[Dict[str, str]]) -> None: | ||
resource = self._get_resource_from_arn(resource_arn) | ||
resource.tags.extend(tags) | ||
|
||
def _get_resource_from_arn(self, arn: str) -> Any: | ||
target_resource, target_name = arn.split(":")[-1].split("/") | ||
try: | ||
resource = self.file_systems.get(target_name) # type: ignore | ||
except KeyError: | ||
message = f"Could not find {target_resource} with name {target_name}" | ||
raise ValueError(message) | ||
return resource | ||
|
||
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None: | ||
resource = self._get_resource_from_arn(resource_arn) | ||
if tag_keys: | ||
resource.tags = [tag for tag in resource.tags if tag["Key"] not in tag_keys] | ||
|
||
|
||
fsx_backends = BackendDict(FSxBackend, "fsx") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
"""Handles incoming fsx requests, invokes methods, returns responses.""" | ||
|
||
import json | ||
|
||
from moto.core.common_types import TYPE_RESPONSE | ||
from moto.core.responses import BaseResponse | ||
|
||
from .models import FSxBackend, fsx_backends | ||
|
||
|
||
class FSxResponse(BaseResponse): | ||
"""Handler for FSx requests and responses.""" | ||
|
||
def __init__(self) -> None: | ||
super().__init__(service_name="fsx") | ||
|
||
@property | ||
def fsx_backend(self) -> FSxBackend: | ||
"""Return backend instance specific for this region.""" | ||
return fsx_backends[self.current_account][self.region] | ||
|
||
def create_file_system(self) -> str: | ||
params = json.loads(self.body) | ||
client_request_token = params.get("ClientRequestToken") | ||
file_system_type = params.get("FileSystemType") | ||
storage_capacity = params.get("StorageCapacity") | ||
storage_type = params.get("StorageType") | ||
subnet_ids = params.get("SubnetIds") | ||
security_group_ids = params.get("SecurityGroupIds") | ||
tags = params.get("Tags") | ||
kms_key_id = params.get("KmsKeyId") | ||
windows_configuration = params.get("WindowsConfiguration") | ||
lustre_configuration = params.get("LustreConfiguration") | ||
ontap_configuration = params.get("OntapConfiguration") | ||
file_system_type_version = params.get("FileSystemTypeVersion") | ||
open_zfs_configuration = params.get("OpenZFSConfiguration") | ||
file_system = self.fsx_backend.create_file_system( | ||
client_request_token=client_request_token, | ||
file_system_type=file_system_type, | ||
storage_capacity=storage_capacity, | ||
storage_type=storage_type, | ||
subnet_ids=subnet_ids, | ||
security_group_ids=security_group_ids, | ||
tags=tags, | ||
kms_key_id=kms_key_id, | ||
windows_configuration=windows_configuration, | ||
lustre_configuration=lustre_configuration, | ||
ontap_configuration=ontap_configuration, | ||
file_system_type_version=file_system_type_version, | ||
open_zfs_configuration=open_zfs_configuration, | ||
) | ||
|
||
return json.dumps(dict(FileSystem=file_system.to_dict())) | ||
|
||
def describe_file_systems(self) -> str: | ||
params = json.loads(self.body) | ||
file_system_ids = params.get("FileSystemIds") | ||
max_results = params.get("MaxResults") | ||
next_token = params.get("NextToken") | ||
file_systems, next_token = self.fsx_backend.describe_file_systems( | ||
file_system_ids=file_system_ids, | ||
max_results=max_results, | ||
next_token=next_token, | ||
) | ||
list_file_systems = [file_system.to_dict() for file_system in file_systems] | ||
return json.dumps(dict(FileSystems=list_file_systems, NextToken=next_token)) | ||
|
||
def delete_file_system(self) -> str: | ||
params = json.loads(self.body) | ||
file_system_id = params.get("FileSystemId") | ||
client_request_token = params.get("ClientRequestToken") | ||
windows_configuration = params.get("WindowsConfiguration") | ||
lustre_configuration = params.get("LustreConfiguration") | ||
open_zfs_configuration = params.get("OpenZFSConfiguration") | ||
( | ||
file_system_id, | ||
lifecycle, | ||
windows_response, | ||
lustre_response, | ||
open_zfs_response, | ||
) = self.fsx_backend.delete_file_system( | ||
file_system_id=file_system_id, | ||
client_request_token=client_request_token, | ||
windows_configuration=windows_configuration, | ||
lustre_configuration=lustre_configuration, | ||
open_zfs_configuration=open_zfs_configuration, | ||
) | ||
|
||
return json.dumps( | ||
dict( | ||
FileSystemId=file_system_id, | ||
Lifecycle=lifecycle, | ||
WindowsResponse=windows_response, | ||
LustreResponse=lustre_response, | ||
OpenZfsResponse=open_zfs_response, | ||
) | ||
) | ||
|
||
def tag_resource(self) -> TYPE_RESPONSE: | ||
params = json.loads(self.body) | ||
resource_arn = params.get("ResourceARN") | ||
tags = params.get("Tags") | ||
self.fsx_backend.tag_resource( | ||
resource_arn=resource_arn, | ||
tags=tags, | ||
) | ||
return 200, {}, json.dumps({}) | ||
|
||
def untag_resource(self) -> TYPE_RESPONSE: | ||
params = json.loads(self.body) | ||
resource_arn = params.get("ResourceARN") | ||
tag_keys = params.get("TagKeys") | ||
self.fsx_backend.untag_resource( | ||
resource_arn=resource_arn, | ||
tag_keys=tag_keys, | ||
) | ||
return 200, {}, json.dumps({}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
"""fsx base URL and path.""" | ||
|
||
from .responses import FSxResponse | ||
|
||
url_bases = [ | ||
r"https?://fsx\.(.+)\.amazonaws\.com", | ||
] | ||
|
||
url_paths = { | ||
"{0}/$": FSxResponse.dispatch, | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
from enum import Enum | ||
from typing import List | ||
|
||
|
||
class FileSystemType(str, Enum): | ||
WINDOWS = "WINDOWS" | ||
LUSTRE = "LUSTRE" | ||
ONTAP = "ONTAP" | ||
OPEN_ZFS = "OPENZFS" | ||
|
||
@classmethod | ||
def list_values(self) -> List[str]: | ||
return sorted([item.value for item in FileSystemType]) |
Empty file.
Oops, something went wrong.