Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL-048 fix root sample ids #528

Closed
wants to merge 15 commits into from
15 changes: 15 additions & 0 deletions lighthouse/data-fixes/data_fix_and_save.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# get the root_sample_ids, fix them, write these to a CSV to be used with the 'write_data' script (which inserts the fixed IDs into the DBs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would group all files related with the same data fix together, so we can add other data fixes in future if needed but keeping things separate, what do you think on creating a subfolder dpl-048 and move all files inside there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, for future reference, it will be very useful to have a README.md file in that subfolder with the description of this data fix and how to run it.


from data_getters import get_data
from data_helpers import remove_everything_after_first_underscore

def save_data():
data = get_data()
print("Editing the root_sample_ids...")
data = data.rename(columns={"root_sample_id": "original_root_sample_id"})
data["root_sample_id"] = data["original_root_sample_id"].apply(remove_everything_after_first_underscore)
print("Adding the root_sample_ids to a CSV file.")
data.to_csv('data-fixes/test-data/root_sample_ids.csv', index=False)

if __name__ == "__main__":
save_data()
25 changes: 25 additions & 0 deletions lighthouse/data-fixes/data_getters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# get the root_sample_ids from MLWH - any root_sample_ids containing an underscore
import sqlalchemy
import pandas as pd

from constants import MYSQL_DB_CONN_STRING, MLWH_DB, SQL_MLWH_GET_MALFORMED_ROOT_IDS

def get_data() -> pd.DataFrame:
print("Attempting to connect to DB.")
try:
sql_engine = sqlalchemy.create_engine(
f"mysql+pymysql://{MYSQL_DB_CONN_STRING}/{MLWH_DB}", pool_recycle=3600
)
db_connection = sql_engine.connect()
print("Connected to the DB... getting data.")
data = pd.read_sql(SQL_MLWH_GET_MALFORMED_ROOT_IDS, db_connection)
print("Got the data.")
except Exception as e:
print("Error while connecting to MySQL")
print(e)
return None
finally:
if db_connection is not None:
print("Closing DB connection.")
db_connection.close()
return data
16 changes: 16 additions & 0 deletions lighthouse/data-fixes/data_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def remove_everything_after_first_underscore(string: str):
substrings_to_skip = [
'...',
'Lysis',
'LYSIS',
'Accuplex',
'ACCUPLEX',
'chris',
'ACC',
'_'
]
split_string = string.split('_')[0]
if any(x in split_string for x in substrings_to_skip):
return string
else:
return split_string
77 changes: 77 additions & 0 deletions lighthouse/data-fixes/data_writers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# take the CSV data and insert corrected IDs into the various DBs
import pymongo
import mysql.connector
import pandas as pd
import argparse

from constants import (LOCALHOST, MONGO_DB, MONGO_DB_CLIENT, MONGO_TABLE, MYSQL_USER, MYSQL_PWD, MLWH_DB, fixed_samples_file)

def write_data_to_db(data: pd.DataFrame, database: str):
if database.lower() == "mongo":
write_to_mongo(data)
elif database.lower() == "mysql":
write_to_mysql(data)
else:
print("Not a valid DB type")
return None

def write_to_mongo(data):
print("Attempting to connect to Mongo DB...")
try:
client = pymongo.MongoClient(MONGO_DB_CLIENT)
db = client[MONGO_DB]
table = db[MONGO_TABLE]
print("Loading in the data...")
for index, row in data.iterrows():
root_sample_id = row["root_sample_id"]
original_root_sample_id = row["original_root_sample_id"]

update_query = { "Root Sample ID": original_root_sample_id }
new_value = { "$set": { "Root Sample ID": root_sample_id } }

table.update_one(update_query, new_value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be updateMany because we want updating all entries with the same root_sample_id.

print("Data loaded in successfully.")
except Exception as e:
print("Error while connecting to MongoDB")
print(e)
return None

def write_to_mysql(data):
print("Attempting to connect to MLWH...")
try:
db_connection = mysql.connector.connect(host = LOCALHOST,
database = MLWH_DB,
user = MYSQL_USER,
password = MYSQL_PWD)
print("Loading in the data...")
cursor = db_connection.cursor()
for index, row in data.iterrows():
root_sample_id = row["root_sample_id"]
original_root_sample_id = row["original_root_sample_id"]
update_query = (
f"UPDATE lighthouse_sample"
f" SET root_sample_id = '{root_sample_id}'"
f" WHERE root_sample_id = '{original_root_sample_id}'"
)
cursor.execute(update_query)
rows_updated = cursor.rowcount
db_connection.commit()
cursor.close()
print("Data loaded in successfully.")
except Exception as e:
print("Error while connecting to MySQL")
print(e)
return None
finally:
if db_connection is not None:
print("Closing MLWH connection.")
db_connection.close()
return

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--db")
args = parser.parse_args()
db = vars(args)["db"]
data = pd.read_csv(fixed_samples_file)
write_data_to_db(data, db)
75 changes: 75 additions & 0 deletions lighthouse/data-fixes/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# helpers for testing e.g. just print all the data out, populate the local DB etc.
import pandas as pd
import sqlalchemy
import pymongo

from constants import MYSQL_DB_CONN_STRING, MLWH_DB, MONGO_DB, MONGO_DB_CLIENT, SQL_GET_ALL_DATA, malformed_csv, control_csv, skippable_csv

def print_data():
try:
mlwh_db = MLWH_DB
print('connecting to DB')
sql_engine = sqlalchemy.create_engine(
f"mysql+pymysql://{MYSQL_DB_CONN_STRING}/{mlwh_db}", pool_recycle=3600
)
db_connection = sql_engine.connect()
print('getting data')
data = pd.read_sql(SQL_GET_ALL_DATA, db_connection)
print('got data')
print(data)
except Exception as e:
print("Error while connecting to MySQL")
print(e)
return None
finally:
if db_connection is not None:
print("Closing mlwh connection")
db_connection.close()

def populate_local_db(database):
malformed = pd.read_csv(malformed_csv)
skippable = pd.read_csv(skippable_csv)
control = pd.read_csv(control_csv)

data = pd.concat([malformed, skippable, control])
data_no_current_rna = data.drop(columns=['current_rna_id'])

if database == "mysql":
populate_mysql(data_no_current_rna)
elif database == "mongo":
populate_mongo(data_no_current_rna)
else:
print("Not a valid DB type")
return None

def populate_mongo(data):
try:
client = pymongo.MongoClient(MONGO_DB_CLIENT)
db = client[MONGO_DB]
table = db["samples"]
data_dict = data.to_dict()
print(data_dict)
table.insert_many([data_dict])
except Exception as e:
print("Error while connecting to MongoDB")
print(e)
return None

def populate_mysql(data):
try:
mlwh_db = MLWH_DB
print('connecting to DB')
sql_engine = sqlalchemy.create_engine(
f"mysql+pymysql://{MYSQL_DB_CONN_STRING}/{mlwh_db}", pool_recycle=3600
)
db_connection = sql_engine.connect()
data.to_sql('lighthouse_sample', db_connection, if_exists='replace', index=False, chunksize=1)
except Exception as e:
print("Error while connecting to MySQL")
print(e)
return None
finally:
if db_connection is not None:
print("Closing mlwh connection")
db_connection.close()
return None