Skip to content

Commit

Permalink
Test rados import and store ceph state
Browse files Browse the repository at this point in the history
  • Loading branch information
Guts committed Jul 18, 2023
1 parent ecd969e commit 8a3d0a1
Showing 1 changed file with 57 additions and 38 deletions.
95 changes: 57 additions & 38 deletions src/rok4/Storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
import tempfile
import re
import os
import rados


import hashlib
import requests
from typing import Dict, List, Tuple, Union
Expand All @@ -47,6 +48,15 @@

from rok4.Exceptions import *

# conditional import
try:
import rados

CEPH_RADOS_AVAILABLE: bool = True
except ImportError:
CEPH_RADOS_AVAILABLE: bool = False
rados = None


class StorageType(Enum):
FILE = "file://"
Expand Down Expand Up @@ -75,7 +85,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s
Returns:
Tuple[Dict[str, Union['boto3.client',str]], str, str]: the S3 informations (client, host, key, secret) and the simple bucket name
"""

global __S3_CLIENTS, __S3_DEFAULT_CLIENT

if not __S3_CLIENTS:
Expand Down Expand Up @@ -134,7 +144,7 @@ def __get_s3_client(bucket_name: str) -> Tuple[Dict[str, Union["boto3.client", s

def disconnect_s3_clients() -> None:
"""Clean S3 clients"""

global __S3_CLIENTS, __S3_DEFAULT_CLIENT
__S3_CLIENTS = {}
__S3_DEFAULT_CLIENT = None
Expand Down Expand Up @@ -363,16 +373,15 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

if range is None :
if range is None:
try:
reponse = requests.get(f"{storage_type.value}{path}", stream=True)
data = reponse.content
if reponse.status_code == 404 :
if reponse.status_code == 404:
raise FileNotFoundError(f"{storage_type.value}{path}")
except Exception as e:
raise StorageError(storage_type.name, e)
else :
else:
raise NotImplementedError

else:
Expand Down Expand Up @@ -471,7 +480,6 @@ def get_size(path: str) -> int:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
# Le stream=True permet de ne télécharger que le header initialement
reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
Expand Down Expand Up @@ -526,12 +534,11 @@ def exists(path: str) -> bool:
return os.path.exists(path)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
response = requests.get(storage_type.value + path, stream=True)
if response.status_code == 200 :
if response.status_code == 200:
return True
else :
else:
return False
except Exception as e:
raise StorageError(storage_type.name, e)
Expand Down Expand Up @@ -839,43 +846,52 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
)

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.FILE:
try:
response = requests.get(from_type.value + from_path, stream = True)
response = requests.get(from_type.value + from_path, stream=True)
with open(to_path, "wb") as f:
for chunk in response.iter_content(chunk_size=65536) :
for chunk in response.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)

except Exception as e:
raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :
raise StorageError(
f"HTTP(S) and FILE",
f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}",
)

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.CEPH:
to_ioctx = __get_ceph_ioctx(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
response = requests.get(from_type.value + from_path, stream=True)
offset = 0
for chunk in response.iter_content(chunk_size=65536) :
for chunk in response.iter_content(chunk_size=65536):
if chunk:
size = len(chunk)
to_ioctx.write(to_base_name, chunk, offset)
offset += size

except Exception as e:
raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :
raise StorageError(
f"HTTP(S) and CEPH",
f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}",
)

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.S3:
to_s3_client, to_bucket = __get_s3_client(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
response = requests.get(from_type.value + from_path, stream=True)
with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
name_fich = f.name
for chunk in response.iter_content(chunk_size=65536) :
for chunk in response.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)

Expand All @@ -884,7 +900,10 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
os.remove(name_fich)

except Exception as e:
raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")
raise StorageError(
f"HTTP(S) and S3",
f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}",
)

else:
raise StorageError(
Expand Down Expand Up @@ -998,7 +1017,8 @@ def get_osgeo_path(path: str) -> str:
else:
raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")

def size_path(path: str) -> int :

def size_path(path: str) -> int:
"""Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)
Args:
Expand All @@ -1011,10 +1031,10 @@ def size_path(path: str) -> int :
Returns:
int: size of the path
"""
storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)

if storage_type == StorageType.FILE:
try :
try:
total = 0
with os.scandir(unprefixed_path) as it:
for entry in it:
Expand All @@ -1029,24 +1049,23 @@ def size_path(path: str) -> int :
elif storage_type == StorageType.S3:
s3_client, bucket_name = __get_s3_client(tray_name)

try :
paginator = s3_client["client"].get_paginator('list_objects_v2')
try:
paginator = s3_client["client"].get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=bucket_name,
Prefix=base_name+"/",
Prefix=base_name + "/",
PaginationConfig={
'PageSize': 10000,
}
"PageSize": 10000,
},
)
total = 0
for page in pages:
for key in page['Contents']:
total += key['Size']
for key in page["Contents"]:
total += key["Size"]

except Exception as e:
raise StorageError("S3", e)


elif storage_type == StorageType.CEPH:
raise NotImplementedError
else:
Expand Down

0 comments on commit 8a3d0a1

Please sign in to comment.