Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore lock #77

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions pydatarecognition/app.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import concurrent.futures
import os
from pathlib import Path
import yaml
import tempfile
import shutil
import uuid
import asyncio
from threading import BoundedSemaphore

from fastapi import FastAPI, Body, HTTPException, status, File
from fastapi.responses import JSONResponse
Expand All @@ -20,6 +23,7 @@
STEPSIZE_REGULAR_QGRID = 10**-3

COLLECTION = "cif"
MAX_MONGO_FIND = 1000000

app = FastAPI()

Expand All @@ -40,7 +44,8 @@
for i in range(len(dois)):
doi_dict[dois[i][0]] = dois[i][1]


# Create an app level semaphore to prevent overloading the RAM. Assume ~56KB per file, *5000 = 2.8GB
semaphore = BoundedSemaphore(50000)


@app.post("/", response_description="Add new CIF", response_model=PydanticPowderCif)
Expand Down Expand Up @@ -98,7 +103,7 @@ async def delete_cif(id: str):
raise HTTPException(status_code=404, detail=f"CIF {id} not found")

@app.put(
"/query/", response_description="Rank matches to User Input Data"
"/rank/", response_description="Rank matches to User Input Data"
)
async def rank_cif(xtype: Literal["twotheta", "q"], wavelength: float, user_input: bytes = File(...), paper_filter_iucrid: Optional[str] = None):
cifname_ranks = []
Expand All @@ -113,22 +118,26 @@ async def rank_cif(xtype: Literal["twotheta", "q"], wavelength: float, user_inpu
if xtype == 'twotheta':
user_q = twotheta_to_q(np.radians(user_x_data), wavelength)
if paper_filter_iucrid:
cif_list = db[COLLECTION].find({"iucrid": paper_filter_iucrid})
cif_cursor = db[COLLECTION].find({"iucrid": paper_filter_iucrid})
else:
cif_list = db[COLLECTION].find({})
async for cif in cif_list:
mongo_cif = PydanticPowderCif(**cif)
try:
data_resampled = xy_resample(user_q, user_q, mongo_cif.q, mongo_cif.intensity, STEPSIZE_REGULAR_QGRID)
pearson = scipy.stats.pearsonr(data_resampled[0][:, 1], data_resampled[1][:, 1])
r_pearson = pearson[0]
p_pearson = pearson[1]
cifname_ranks.append(mongo_cif.cif_file_name)
r_pearson_ranks.append(r_pearson)
doi = doi_dict[mongo_cif.iucrid]
doi_ranks.append(doi)
except AttributeError:
print(f"{mongo_cif.cif_file_name} was skipped.")
cif_cursor = db[COLLECTION].find({})
unpopulated_cif_list = await cif_cursor.to_list(length=MAX_MONGO_FIND)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = (executor.submit(limited_cif_load, cif) for cif in unpopulated_cif_list)
for future in concurrent.futures.as_completed(futures):
mongo_cif = future.result()
try:
data_resampled = xy_resample(user_q, user_q, mongo_cif.q, mongo_cif.intensity, STEPSIZE_REGULAR_QGRID)
pearson = scipy.stats.pearsonr(data_resampled[0][:, 1], data_resampled[1][:, 1])
r_pearson = pearson[0]
p_pearson = pearson[1]
cifname_ranks.append(mongo_cif.cif_file_name)
r_pearson_ranks.append(r_pearson)
doi = doi_dict[mongo_cif.iucrid]
doi_ranks.append(doi)
except AttributeError:
print(f"{mongo_cif.cif_file_name} was skipped.")
semaphore.release()

cif_rank_pearson = sorted(list(zip(cifname_ranks, r_pearson_ranks, doi_ranks)), key=lambda x: x[1], reverse=True)
ranks = [{'IUCrCIF': cif_rank_pearson[i][0],
Expand All @@ -138,6 +147,11 @@ async def rank_cif(xtype: Literal["twotheta", "q"], wavelength: float, user_inpu
return ranks


def limited_cif_load(cif: dict):
semaphore.acquire()
return PydanticPowderCif(**cif)


if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="localhost", reload=True)
6 changes: 5 additions & 1 deletion pydatarecognition/mongo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,18 @@ def cifs_to_mongo(mongo_db_uri: str, mongo_db_name: str, mongo_collection_name:
import yaml

from google.cloud import storage
from google.cloud.exceptions import Conflict

filepath = Path(os.path.abspath(__file__))

if os.path.isfile(os.path.join(filepath.parent.absolute(), '../requirements/testing-cif-datarec-secret.json')):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.path.join(filepath.parent.absolute(),
'../requirements/testing-cif-datarec-secret.json')
storage_client = storage.Client()
storage_client.create_bucket('raw_cif_data')
try:
storage_client.create_bucket('raw_cif_data')
except Conflict:
pass
CIF_DIR = filepath.parent.parent / 'docs' / 'examples' / 'cifs'
with open('secret_password.yml', 'r') as f:
secret_dict = yaml.safe_load(f)
Expand Down
3 changes: 2 additions & 1 deletion pydatarecognition/powdercif.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def resolve_gcs_token(cls, val):
return val


#each is ~56KB, 1000 of which are 0.56GB of RAM
# Each is ~56KB, 1000 of which are 0.56GB of RAM
# As it stands, this doesn't help, since all of the database will be loaded every time and you might as well not use S3
@lru_cache(maxsize=1000, typed=True)
def retrieve_glob_as_np(uid: str) -> np.ndarray:
storage_client = storage.Client()
Expand Down