diff --git a/docker-compose-local.yml b/docker-compose-local.yml new file mode 100644 index 0000000..2420f28 --- /dev/null +++ b/docker-compose-local.yml @@ -0,0 +1,44 @@ +version: "3.8" +services: + + + # Elasticsearch Docker Images: https://www.docker.elastic.co/ + elasticsearch: + mem_limit: 1G + mem_reservation: 128M + cpus: 0.7 + image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0 + container_name: elasticsearch + environment: + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms512m -Xmx512m" + - discovery.type=single-node + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + cap_add: + - IPC_LOCK + volumes: + - elasticsearch-data:/usr/share/elasticsearch/data + ports: + - 9200:9200 + - 9300:9300 + + kibana: + container_name: kibana + image: docker.elastic.co/kibana/kibana:8.5.0 + environment: + SERVER_NAME: kibana + ELASTICSEARCH_HOSTS: http://elasticsearch:9200 + ports: + - 5601:5601 + depends_on: + - elasticsearch + +volumes: + elasticsearch-data: + driver: local \ No newline at end of file diff --git a/log_import_error b/log_import_error new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt index 467d8bb..5d68ece 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ ijson==3.1.4 pydantic==1.10.2 python-dotenv==0.21.0 requests==2.28.1 +pandas diff --git a/src/analysis/find_duplicates.py b/src/analysis/find_duplicates.py new file mode 100644 index 0000000..71171ae --- /dev/null +++ b/src/analysis/find_duplicates.py @@ -0,0 +1,33 @@ +import argparse +import json +import csv +from collections import defaultdict + +import pandas as pd + +def group_columns(file_path): + df = pd.read_csv(file_path, delimiter='\t', dtype=str) + + col_hashes = {col: hash(tuple(df[col])) for col in df.columns} + + groups = defaultdict(list) + for col, col_hash in col_hashes.items(): + groups[col_hash].append(col) + + duplicate_groups = {f"group{i+1}": cols for i, cols in enumerate(groups.values()) if len(cols) > 1} + + return duplicate_groups + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("file_path", type=str, help="Path to the TSV file.") + + args = parser.parse_args() + result = group_columns(args.file_path) + + print(json.dumps(result, indent=4)) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/convert_to_json.py b/src/convert_to_json.py index 9c363fd..3e7dfb3 100644 --- a/src/convert_to_json.py +++ b/src/convert_to_json.py @@ -5,7 +5,6 @@ import argparse import shutil import json -import shutil from os import path as ospath def main(): @@ -53,7 +52,7 @@ def tmp_print(*argv): def parse_line(line, data_parser, header): d = {} for idx in range(len(header)): - if idx >= len(line) or line[idx] == ".": + if idx >= len(line) or line[idx] == "": continue k = header[idx] try: @@ -122,7 +121,7 @@ def convert_file(in_filepath, out_dir, es_index): da_list.append(data) except: error.write(cols) - if count % 50000 == 0: + if count % 100000 == 0: write_to_json(ospath.join(out_dir, str(out_filename) + '.json'), da_list) out_filename += 1 da_list = [] @@ -139,7 +138,8 @@ def convert_file(in_filepath, out_dir, es_index): start_time = time.time() tmp_print.l = 0 - main() + #main() find_error = init_find_error() - start_time = time.time() \ No newline at end of file + start_time = time.time() + \ No newline at end of file diff --git a/src/db_batch.template b/src/db_batch.template new file mode 100644 index 0000000..bd7b50d --- /dev/null +++ b/src/db_batch.template @@ -0,0 +1,23 @@ +#!/bin/bash +#SBATCH --ntasks=1 +#SBATCH --time=50:00:00 +#SBATCH --account=pdthomas_136 +#SBATCH --partition=thomas +#SBATCH --mem-per-cpu=20G +#SBATCH --mail-type=end,fail +#SBATCH --mail-user=mushayah@usc.edu + + +base_dir='$BASE_DIR' +in_file='$IN_FILE' +out_dir='$OUT_DIR' +es_index='annoq-annotations' + +module load python/3.9.12 +python3 --version +source venv/bin/activate + +echo "python3 convert_to_json.py -i $in_file -o $out_dir --es_index $es_index" +echo start at `date` + python3 $base_dir/annoq-database/src/convert_to_json.py -i $in_file -o $out_dir --es_index $es_index +echo end at `date` \ No newline at end of file diff --git a/src/hrc_batch.template b/src/hrc_batch.template deleted file mode 100644 index 568b642..0000000 --- a/src/hrc_batch.template +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -#SBATCH --ntasks=1 -#SBATCH --time=50:00:00 -#SBATCH --account=pdthomas_136 -#SBATCH --partition=thomas -#SBATCH --mem-per-cpu=90G - -in_file='$IN_FILE' -out_dir='$OUT_DIR' -es_index='annoq-test' - -module load python/3.9.2 -python3 --version -source venv/bin/activate - -echo "python3 convert_to_json.py -i $in_file -o $out_dir --es_index $es_index" -echo start at `date` - python3 convert_to_json.py -i $in_file -o $out_dir --es_index $es_index -echo end at `date` - diff --git a/src/index_es_json.py b/src/index_es_json.py index 70fbe20..2e28e2b 100644 --- a/src/index_es_json.py +++ b/src/index_es_json.py @@ -39,7 +39,7 @@ def bulk_load_parallel(directory): def bulk_load(directory): - helpers.bulk(es, load_json(directory), index=settings.ANNOQ_ANNOTATIONS_INDEX, chunk_size=5000, request_timeout=1000) + helpers.bulk(es, load_json(directory), index=settings.ANNOQ_ANNOTATIONS_INDEX, chunk_size=10000, request_timeout=1000) def bulk_load_streaming(directory): diff --git a/src/run_jobs.sh b/src/run_jobs.sh new file mode 100644 index 0000000..40f680a --- /dev/null +++ b/src/run_jobs.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +if [ "$#" -ne 2 ]; then + echo "Error: Exactly 2 arguments are required." + echo "Usage: $0 " + exit 1 +fi + +work_name=$1 +base_dir=$2 + +echo "running work name: $work_name using base dir: $base_dir" + +in_dir=$base_dir/annoq-data-builder/wgsa_add/output/$work_name +out_dir=$base_dir/annoq-database/output/$work_name +slurm_dir=$base_dir/annoq-database/slurm/$work_name +sbatch_template=$base_dir/annoq-database/src/db_batch.template + +rm -rf $out_dir $slurm_dir + +mkdir -p $slurm_dir +mkdir -p $out_dir + +for fp in `ls $in_dir` ; do + echo $in_dir/$fp + mkdir -p $out_dir/${fp} + BASE_DIR=$base_dir IN_FILE=$in_dir/${fp} OUT_DIR=$out_dir/${fp} \ + envsubst '$BASE_DIR, $IN_FILE, $OUT_DIR' < $sbatch_template > $slurm_dir/slurm_${fp}.slurm + sbatch $slurm_dir/slurm_${fp}.slurm +done \ No newline at end of file diff --git a/run_jobs.sh b/src/run_jobs_local.sh similarity index 59% rename from run_jobs.sh rename to src/run_jobs_local.sh index 5858726..06819e7 100644 --- a/run_jobs.sh +++ b/src/run_jobs_local.sh @@ -1,14 +1,8 @@ #!/bin/bash - -in_dir='./../WGSA_add/output/hrc_sample' -out_dir='output/hrc_es_jsons' -slurm_dir='slurm' - -#local testing -#in_dir='./../annoq-data/hrc-test' -#out_dir='./../annoq-data/tmp11_' -#slurm_dir='./../annoq-data/slurm2' +in_dir='./../annoq-data/hrc-test' +out_dir='./../annoq-data/tmp11_' +slurm_dir='./../annoq-data/slurm2' mkdir -p $slurm_dir mkdir -p $out_dir