Skip to content

Commit

Permalink
Handle missing python rados
Browse files Browse the repository at this point in the history
  • Loading branch information
Guts committed Aug 29, 2023
1 parent a053513 commit ab170b6
Showing 1 changed file with 63 additions and 54 deletions.
117 changes: 63 additions & 54 deletions src/rok4/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,32 @@
To precise the cluster to use, bucket name should be [email protected] or [email protected]. If no host is defined (no @) in the bucket name, first S3 cluster is used
"""

import hashlib
import os
import re
import tempfile
from shutil import copyfile
from typing import Dict, List, Tuple, Union

import boto3
import botocore.exceptions
import tempfile
import re
import os
import rados
import hashlib
import requests
from typing import Dict, List, Tuple, Union
from shutil import copyfile
from osgeo import gdal

# conditional import
try:
import rados

CEPH_RADOS_AVAILABLE: bool = True
except ImportError:
CEPH_RADOS_AVAILABLE: bool = False
rados = None

gdal.UseExceptions()

from rok4.exceptions import *
from rok4.enums import StorageType

from rok4.exceptions import *

__S3_CLIENTS = {}
__S3_DEFAULT_CLIENT = None
Expand Down Expand Up @@ -355,16 +364,15 @@ def get_data_binary(path: str, range: Tuple[int, int] = None) -> str:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

if range is None :
if range is None:
try:
reponse = requests.get(f"{storage_type.value}{path}", stream=True)
data = reponse.content
if reponse.status_code == 404 :
if reponse.status_code == 404:
raise FileNotFoundError(f"{storage_type.value}{path}")
except Exception as e:
raise StorageError(storage_type.name, e)
else :
else:
raise NotImplementedError

else:
Expand Down Expand Up @@ -463,7 +471,6 @@ def get_size(path: str) -> int:
raise StorageError("FILE", e)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
# Le stream=True permet de ne télécharger que le header initialement
reponse = requests.get(storage_type.value + path, stream=True).headers["content-length"]
Expand Down Expand Up @@ -518,12 +525,11 @@ def exists(path: str) -> bool:
return os.path.exists(path)

elif storage_type == StorageType.HTTP or storage_type == StorageType.HTTPS:

try:
response = requests.get(storage_type.value + path, stream=True)
if response.status_code == 200 :
if response.status_code == 200:
return True
else :
else:
return False
except Exception as e:
raise StorageError(storage_type.name, e)
Expand Down Expand Up @@ -831,47 +837,52 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
f"CEPH and S3", f"Cannot copy CEPH object {from_path} to S3 object {to_path} : {e}"
)

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.FILE :

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.FILE:
try:
response = requests.get(from_type.value + from_path, stream = True)
response = requests.get(from_type.value + from_path, stream=True)
with open(to_path, "wb") as f:
for chunk in response.iter_content(chunk_size=65536) :

for chunk in response.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)

except Exception as e:
raise StorageError(
f"HTTP(S) and FILE",
f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}",
)

raise StorageError(f"HTTP(S) and FILE", f"Cannot copy HTTP(S) object {from_path} to FILE object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.CEPH :

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.CEPH:
to_ioctx = __get_ceph_ioctx(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
response = requests.get(from_type.value + from_path, stream=True)
offset = 0
for chunk in response.iter_content(chunk_size=65536) :
for chunk in response.iter_content(chunk_size=65536):
if chunk:
size = len(chunk)
to_ioctx.write(to_base_name, chunk, offset)
offset += size

except Exception as e:
raise StorageError(
f"HTTP(S) and CEPH",
f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}",
)

raise StorageError(f"HTTP(S) and CEPH", f"Cannot copy HTTP(S) object {from_path} to CEPH object {to_path} : {e}")

elif (from_type == StorageType.HTTP or from_type == StorageType.HTTPS) and to_type == StorageType.S3 :

elif (
from_type == StorageType.HTTP or from_type == StorageType.HTTPS
) and to_type == StorageType.S3:
to_s3_client, to_bucket = __get_s3_client(to_tray)

try:
response = requests.get(from_type.value + from_path, stream = True)
with tempfile.NamedTemporaryFile("w+b",delete=False) as f:
response = requests.get(from_type.value + from_path, stream=True)
with tempfile.NamedTemporaryFile("w+b", delete=False) as f:
name_fich = f.name
for chunk in response.iter_content(chunk_size=65536) :

for chunk in response.iter_content(chunk_size=65536):
if chunk:
f.write(chunk)

Expand All @@ -880,8 +891,10 @@ def copy(from_path: str, to_path: str, from_md5: str = None) -> None:
os.remove(name_fich)

except Exception as e:
raise StorageError(f"HTTP(S) and S3", f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}")

raise StorageError(
f"HTTP(S) and S3",
f"Cannot copy HTTP(S) object {from_path} to S3 object {to_path} : {e}",
)

else:
raise StorageError(
Expand Down Expand Up @@ -928,9 +941,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:

try:
target_s3_client["client"].put_object(
Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(
"utf-8"
),
Body=f"{__OBJECT_SYMLINK_SIGNATURE}{target_bucket}/{target_base_name}".encode(),
Bucket=link_bucket,
Key=link_base_name,
)
Expand All @@ -941,9 +952,7 @@ def link(target_path: str, link_path: str, hard: bool = False) -> None:
ioctx = __get_ceph_ioctx(link_tray)

try:
ioctx.write_full(
link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode("utf-8")
)
ioctx.write_full(link_base_name, f"{__OBJECT_SYMLINK_SIGNATURE}{target_path}".encode())
except Exception as e:
raise StorageError("CEPH", e)

Expand Down Expand Up @@ -995,7 +1004,8 @@ def get_osgeo_path(path: str) -> str:
else:
raise NotImplementedError(f"Cannot get a GDAL/OGR compliant path from {path}")

def size_path(path: str) -> int :

def size_path(path: str) -> int:
"""Return the size of the path given (or, for the CEPH, the sum of the size of each object of the .list)
Args:
Expand All @@ -1008,10 +1018,10 @@ def size_path(path: str) -> int :
Returns:
int: size of the path
"""
storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)
storage_type, unprefixed_path, tray_name, base_name = get_infos_from_path(path)

if storage_type == StorageType.FILE:
try :
try:
total = 0
with os.scandir(unprefixed_path) as it:
for entry in it:
Expand All @@ -1026,20 +1036,19 @@ def size_path(path: str) -> int :
elif storage_type == StorageType.S3:
s3_client, bucket_name = __get_s3_client(tray_name)


try :
paginator = s3_client["client"].get_paginator('list_objects_v2')
try:
paginator = s3_client["client"].get_paginator("list_objects_v2")
pages = paginator.paginate(
Bucket=bucket_name,
Prefix=base_name+"/",
Prefix=base_name + "/",
PaginationConfig={
'PageSize': 10000,
}
"PageSize": 10000,
},
)
total = 0
for page in pages:
for key in page['Contents']:
total += key['Size']
for key in page["Contents"]:
total += key["Size"]

except Exception as e:
raise StorageError("S3", e)
Expand Down

0 comments on commit ab170b6

Please sign in to comment.