Skip to content

Commit

Permalink
materialised queries in nextflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesamcl committed Dec 25, 2024
1 parent 528ee19 commit b310947
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 18 deletions.
2 changes: 2 additions & 0 deletions dataload/08_run_queries/run_queries.dockerpy
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ from pandas import DataFrame
import json
from timeit import default_timer as timer

os.system('echo "dbms.security.auth_enabled=false" >> /var/lib/neo4j/conf/neo4j.conf')

os.system("neo4j start")
os.system("sleep 20")

Expand Down
6 changes: 3 additions & 3 deletions dataload/08_run_queries/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ def main():
'--bind ' + shlex.quote(neo_data_path) + ':/data',
'--bind ' + shlex.quote(neo_logs_path) + ':/logs',
'--bind ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'--bind ' + os.path.abspath(os.environ['QUERY_YAMLS_PATH']) + ':/materialised_queries',
'--bind ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries',
'--bind ' + os.path.abspath(args.out_sqlites_path) + ':/out',
'--writable-tmpfs',
'--network=none',
'--env NEO4J_AUTH=none',
'docker://ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
'python3 /run_queries.dockerpy'
'python3 /run_queries.py'
])
else:
cmd = ' '.join([
Expand All @@ -50,7 +50,7 @@ def main():
'-v ' + shlex.quote(neo_data_path) + ':/data',
'-v ' + shlex.quote(neo_logs_path) + ':/logs',
'-v ' + os.path.abspath(os.path.join(os.environ['GREBI_DATALOAD_HOME'], '08_run_queries/run_queries.dockerpy')) + ':/run_queries.py',
'-v ' + os.path.abspath(os.environ['QUERY_YAMLS_PATH']) + ':/materialised_queries',
'-v ' + os.path.abspath(os.environ['GREBI_QUERY_YAMLS_PATH']) + ':/materialised_queries',
'-v ' + os.path.abspath(args.out_sqlites_path) + ':/out',
'-e NEO4J_AUTH=none',
'ghcr.io/ebispot/grebi_neo4j_with_extras:5.18.0',
Expand Down
2 changes: 1 addition & 1 deletion dataload/docker_envs/Dockerfile.neo4j_with_extras
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
FROM neo4j:5.18.0

RUN apt-get update && apt-get install -y python python3-pip sqlite3 rsync
RUN pip install pandas py2neo
RUN pip install pandas py2neo pyyaml

10 changes: 9 additions & 1 deletion dataload/nextflow/codon_nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ process {
}

process {
withName: materialise {
withName: link {
memory = 96.GB
}
}
Expand Down Expand Up @@ -89,5 +89,13 @@ process {
}
}

process {
withName: run_materialised_queries {
cpus = 8
memory = 1500.GB
}
}




26 changes: 13 additions & 13 deletions dataload/nextflow/load_subgraph.nf
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ params.subgraph = "$GREBI_SUBGRAPH"
params.timestamp = "$GREBI_TIMESTAMP"
params.is_ebi = "$GREBI_IS_EBI"
params.solr_mem = "140g"
params.neo_tmp_path = "/dev/shm"

workflow {

Expand All @@ -29,13 +30,13 @@ workflow {

indexed = index(merged.collect())

materialise(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt)
merge_summary_jsons(indexed.summary_json.collect() + materialise.out.mat_summary.collect())
link(merged.flatten(), indexed.metadata_jsonl, indexed.summary_json, Channel.value(config.exclude_edges + config.identifier_props), Channel.value(config.exclude_self_referential_edges + config.identifier_props), groups_txt)
merge_summary_jsons(indexed.summary_json.collect() + link.out.mat_summary.collect())

compressed_blobs = create_compressed_blobs(materialise.out.nodes.mix(materialise.out.edges))
compressed_blobs = create_compressed_blobs(link.out.nodes.mix(link.out.edges))
sqlite = create_sqlite(compressed_blobs.collect())

neo_input_dir = prepare_neo(indexed.summary_json, materialise.out.nodes, materialise.out.edges)
neo_input_dir = prepare_neo(indexed.summary_json, link.out.nodes, link.out.edges)

ids_csv = create_neo_ids_csv(indexed.ids_txt)
neo_db = create_neo(
Expand All @@ -47,7 +48,7 @@ workflow {

mat_queries_sqlites = run_materialised_queries(neo_db)

solr_inputs = prepare_solr(materialise.out.nodes, materialise.out.edges)
solr_inputs = prepare_solr(link.out.nodes, link.out.edges)
solr_nodes_core = create_solr_nodes_core(prepare_solr.out.nodes.collect(), indexed.names_txt)
solr_edges_core = create_solr_edges_core(prepare_solr.out.edges.collect(), indexed.names_txt)
solr_autocomplete_core = create_solr_autocomplete_core(indexed.names_txt)
Expand Down Expand Up @@ -227,7 +228,7 @@ process index {
"""
}

process materialise {
process link {
cache "lenient"
memory "4 GB"
time "8h"
Expand All @@ -244,8 +245,8 @@ process materialise {
path(groups_txt)

output:
path("materialised_nodes_${task.index}.jsonl"), emit: nodes
path("materialised_edges_${task.index}.jsonl"), emit: edges
path("linked_nodes_${task.index}.jsonl"), emit: nodes
path("linked_edges_${task.index}.jsonl"), emit: edges
path("mat_summary_${task.index}.json"), emit: mat_summary

script:
Expand All @@ -257,11 +258,11 @@ process materialise {
--in-metadata-jsonl ${metadata_jsonl} \
--in-summary-json ${index_summary_json} \
--groups-txt ${groups_txt} \
--out-edges-jsonl materialised_edges_${task.index}.jsonl \
--out-edges-jsonl linked_edges_${task.index}.jsonl \
--out-summary-json mat_summary_${task.index}.json \
--exclude ${exclude.iterator().join(",")} \
--exclude-self-referential ${exclude_self_referential.iterator().join(",")} \
> materialised_nodes_${task.index}.jsonl
> linked_nodes_${task.index}.jsonl
"""
}

Expand Down Expand Up @@ -440,7 +441,6 @@ process run_materialised_queries {
memory "8 GB"
time "8h"
cpus "8"
neo_tmp_path "/dev/shm"

publishDir "${params.tmp}/${params.config}/${params.subgraph}", overwrite: true

Expand All @@ -454,9 +454,9 @@ process run_materialised_queries {
"""
#!/usr/bin/env bash
set -Eeuo pipefail
cp -r ${neo_db}/* ${task.neo_tmp_path}
cp -r ${neo_db}/* ${params.neo_tmp_path}
PYTHONUNBUFFERED=true python3 ${params.home}/08_run_queries/run_queries.py \
--in-db-path ${task.neo_tmp_path} \
--in-db-path ${params.neo_tmp_path} \
--out-sqlites-path materialised_queries
"""
}
Expand Down

0 comments on commit b310947

Please sign in to comment.