-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject_store.py
135 lines (110 loc) · 4.35 KB
/
object_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Minio object store."""
import contextlib
import logging
import sys
import tempfile
from pathlib import Path
from typing import Iterator, List, Optional
import gevent
import minio
import urllib3
from minio import Minio
from minio.error import ResponseError
# from predictor.config import (MINIO_ACCESS_KEY, MINIO_HOST, MINIO_SECRET_KEY,
# MINIO_SECURE)
class ObjectStore(object):
"""Object store."""
def __init__(self,MINIO_HOST, MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_SECURE: bool=False):
"""Construct store object."""
self.client: Minio
self.MINIO_ACCESS_KEY = MINIO_ACCESS_KEY
self.MINIO_HOST = MINIO_HOST
self.MINIO_SECRET_KEY = MINIO_SECRET_KEY
self.MINIO_SECURE = MINIO_SECURE
@staticmethod
def get_client():
"""Connect to minio client."""
logging.debug("connecting to server")
return Minio(
self.MINIO_HOST,
access_key=self.MINIO_ACCESS_KEY.,
secret_key=self.MINIO_SECRET_KEY,
secure=self.MINIO_SECURE,
)
@property
def is_connected(self) -> bool:
"""Check client connection."""
return bool(self.client)
def connect(self):
"""Connect to server."""
try:
self.client = ObjectStore.get_client()
except minio.error.MinioError as err:
logging.error(f"error connecting to server {err}")
raise err
def ensure_bucket(self, bucket_name: str):
"""Ensure the bucket exists."""
if not self.client:
raise minio.error.MinioError("client not connected")
if not self.client.bucket_exists(bucket_name):
self.client.make_bucket(bucket_name)
def upload_blob(self, filepath: Path, bucket_name: str):
"""Upload a file blob."""
try:
logging.debug(f"uploading file")
self.client.fput_object(bucket_name, filepath.name, filepath)
logging.debug(f"uploaded file")
except minio.error.MinioError as err:
logging.error(f"exception uploading: {err}")
def download_blob_response(
self, object_name: str, bucket_name: str
) -> urllib3.response.HTTPResponse:
"""Download a file blob."""
logging.debug(f"downloading blob: {object_name}")
try:
return self.client.get_object(bucket_name, object_name)
except ResponseError as err:
logging.info(f"error getting the object: {err}")
raise err
@contextlib.contextmanager
def download_file(self, file: str, bucket_name: str) -> Iterator[Path]:
"""Download file to temporary local directory."""
try: # TODO: move this to decorator
if not self.is_connected:
self.connect()
response = self.download_blob_response(file, bucket_name)
gevent.sleep(0.001)
with tempfile.TemporaryDirectory() as tmpdir:
filepath = tmpdir / Path(f"{file}")
with open(filepath, "wb") as file_data:
gevent.sleep(0.001)
for d in response.stream(64 * 1024):
gevent.sleep(0.001)
file_data.write(d)
if filepath.exists():
gevent.sleep(0.001)
yield filepath
else:
raise OSError("file streaming failed")
except minio.ResponseError as err:
logging.error(f"{err}")
raise err
def list_bucket(self, bucket_name: str, limit: int = 250) -> List[str]:
"""List files in a bucket."""
objects = self.client.list_objects_v2(bucket_name)
result = []
for _ in range(limit):
try:
obj = next(objects)
result.append(obj.object_name)
except StopIteration:
pass
return result
def rename(self, old_object_name: str, new_object_name: str, bucket_name: str):
"""Rename object."""
tempdir = tempfile.TemporaryDirectory()
local_file = Path(tempdir.name) / old_object_name
self.client.fget_object(bucket_name, old_object_name, str(local_file))
self.client.fput_object(bucket_name, new_object_name, str(local_file))
self.client.remove_object(bucket_name, old_object_name)
local_file.unlink()