Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
add reindex and fix download
Browse files Browse the repository at this point in the history
  • Loading branch information
Anjiurine committed Nov 19, 2023
1 parent f31e86f commit 398a55b
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 128 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__pycache__
download
uri_list.json
*.json
dist
build
*.spec
*.spec
cache
72 changes: 39 additions & 33 deletions utils/mongo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from threading import Lock
import traceback
import logging
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO)

Expand Down Expand Up @@ -33,32 +34,30 @@ def download_wrapper(args):

def connect_mongo_cluster(uri_list):
client_list = []
for uri in uri_list:
client = MongoClient(uri , maxPoolSize=1024, minPoolSize=64)
client_list.append(client)
def connect(uri):
return MongoClient(uri, maxPoolSize=1024, minPoolSize=64)
with ThreadPoolExecutor() as executor:
futures = [executor.submit(connect, uri) for uri in uri_list]
for future in futures:
client = future.result()
client_list.append(client)
return client_list

def upload_file(client_list, file_path):
file_path = os.path.abspath(file_path)

with open(file_path, "rb") as f:
file_data = f.read()

file_sha256 = hashlib.sha256(file_data).hexdigest()
file_name, file_ext = os.path.basename(file_path).split(".")
file_size = os.path.getsize(file_path)
file_7z_path = file_path + ".7z"

with py7zr.SevenZipFile(file_7z_path, 'w') as archive:
archive.write(file_path, file_name + "." + file_ext)

file_7z_size = os.path.getsize(file_7z_path)
file_7z_data = open(file_7z_path, "rb").read()
os.remove(file_7z_path)

num_of_clients = len(client_list)
chunk_size = file_7z_size // num_of_clients

for i in range(num_of_clients):
client = client_list[i]
db = client["files"]
Expand All @@ -68,7 +67,6 @@ def upload_file(client_list, file_path):
chunk_data = file_7z_data[start:end]
chunk_data_b64 = base64.b64encode(chunk_data)
chunk_sha256 = hashlib.sha256(chunk_data).hexdigest()

chunk_doc = {
"name": file_name,
"ext": file_ext,
Expand All @@ -80,7 +78,6 @@ def upload_file(client_list, file_path):
"chunk_sha256": chunk_sha256
}
col.insert_one(chunk_doc)

return file_sha256

def download_file(client_list, file_sha256, save_path):
Expand All @@ -89,29 +86,28 @@ def download_file(client_list, file_sha256, save_path):
file_ext = None
file_size = None
total_chunks = None
chunk_docs = []
for client in client_list:
db = client["files"]
col = db["files"]
chunk_doc = col.find_one({"sha256": file_sha256})
if chunk_doc:
chunk_docs.extend(col.find({"sha256": file_sha256}).sort("chunk_no"))
for chunk_doc in chunk_docs:
if not file_name:
file_name = chunk_doc["name"]
file_ext = chunk_doc["ext"]
file_size = chunk_doc["size"]
total_chunks = chunk_doc["total_chunks"]
chunk_data_b64 = chunk_doc["data"]
chunk_data = base64.b64decode(chunk_data_b64)
file_7z_data += chunk_data
else:
return None

file_7z_path = save_path + file_name + "." + file_ext + ".7z"
chunk_data_b64 = chunk_doc["data"]
chunk_data = base64.b64decode(chunk_data_b64)
file_7z_data += chunk_data
file_7z_path = os.path.join(save_path, f"{file_name}.{file_ext}.7z")
with open(file_7z_path, "wb") as f:
f.write(file_7z_data)

with py7zr.SevenZipFile(file_7z_path, 'r') as archive:
archive.extractall(save_path)
os.remove(file_7z_path)
file_path = save_path + file_name + "." + file_ext
file_path = os.path.join(save_path, f"{file_name}.{file_ext}")
return file_path

def list_files(client_list):
Expand All @@ -137,14 +133,18 @@ def list_files(client_list):
return file_list

def delete_file(client_list, file_sha256):
result = True
total_clients = len(client_list)
deleted_chunks = set()
for client in client_list:
db = client["files"]
col = db["files"]
chunk_doc = col.find_one_and_delete({"sha256": file_sha256})
if not chunk_doc:
result = False
return result
result = col.find_one_and_delete({"sha256": file_sha256})
if result:
deleted_chunks.add(result["chunk_no"])
if len(deleted_chunks) == total_clients:
return True
else:
return False

def search_file(client_list, keyword):
file_list = []
Expand All @@ -168,17 +168,14 @@ def search_file(client_list, keyword):
file_list.append(file_info)
return file_list


def get_storage_info(client_list, selected_instance=None):
total_storage = 0
total_free_storage = 0
total_used_storage = 0
storage_info_list = []

for i, client in enumerate(client_list, start=1):
if selected_instance is not None and i != selected_instance:
continue

db = client.files
stats = db.command('dbstats')
storage_info = {
Expand All @@ -187,22 +184,31 @@ def get_storage_info(client_list, selected_instance=None):
"used_storage": stats["dataSize"]
}
storage_info_list.append(storage_info)

total_storage += stats["storageSize"]
total_free_storage += stats["storageSize"] - stats["dataSize"]
total_used_storage += stats["dataSize"]

if selected_instance is not None:
break

if selected_instance is None:
storage_info_list = [{
"instance_uri": "All Instances",
"total_storage": total_storage,
"free_storage": total_free_storage,
"used_storage": total_used_storage
}]

return storage_info_list


def reindex_files(client_list, save_path='./cache'):
if not os.path.exists(save_path):
os.makedirs(save_path)
file_list = list_files(client_list)
for file_info in file_list:
file_sha256 = file_info["sha256"]
file_name = file_info["name"]
file_path = os.path.join(save_path, file_name)
downloaded_file_path = download_file(client_list, file_sha256, save_path)
delete_file(client_list, file_sha256)
if downloaded_file_path:
upload_file(client_list, downloaded_file_path)
os.remove(downloaded_file_path)
Loading

0 comments on commit 398a55b

Please sign in to comment.