From dde0e4528c7d41215033e3fbb0cb7f630e28687c Mon Sep 17 00:00:00 2001 From: "Anna (Anya) Parker" <50943381+anna-parker@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:57:54 +0100 Subject: [PATCH] feat(ingest): also stream in metadata file for batch submissions --- ingest/Snakefile | 24 +++++-- ingest/scripts/call_loculus.py | 123 ++++++++++++++++++--------------- 2 files changed, 89 insertions(+), 58 deletions(-) diff --git a/ingest/Snakefile b/ingest/Snakefile index 2110bb283..1d33496db 100644 --- a/ingest/Snakefile +++ b/ingest/Snakefile @@ -588,14 +588,30 @@ rule sort_fasta: awk '/^>/ {{if (seq) print seq; seq=""; print; next}} {{seq = seq $0}} END {{if (seq) print seq}}' {input.sequences} | \ paste - - | \ sort -k1,1 | \ - tr "\t" "\n" > {output.sorted} + tr "\\t" "\\n" > {output.sorted} + """ + + +rule sort_metadata: + input: + metadata="results/{basename}.tsv", + output: + sorted="results/{basename}_sorted.tsv", + shell: + """ + columnNumber=$(awk -F'\\t' '{{for(i=1;i<=NF;i++) if($i=="submissionId") print i}}' {input.metadata}); + if [ ${{columnNumber}} ]; then + (head -n 1 {input.metadata} && tail -n +2 {input.metadata} | sort -t$'\t' -k${{columnNumber}},${{columnNumber}}) > {output.sorted} + else + cat {input.metadata} > {output.sorted} + fi """ rule submit: input: script="scripts/call_loculus.py", - metadata="results/submit_metadata.tsv", + metadata="results/submit_metadata_sorted.tsv", sequences="results/submit_sequences_sorted.fasta", config="results/config.yaml", output: @@ -618,7 +634,7 @@ rule submit: rule revise: input: script="scripts/call_loculus.py", - metadata="results/revise_metadata.tsv", + metadata="results/revise_metadata_sorted.tsv", sequences="results/revise_sequences_sorted.fasta", config="results/config.yaml", output: @@ -641,7 +657,7 @@ rule revise: rule regroup_and_revoke: input: script="scripts/call_loculus.py", - metadata="results/metadata_to_submit_prior_to_revoke.tsv", + metadata="results/metadata_to_submit_prior_to_revoke_sorted.tsv", sequences="results/sequences_to_submit_prior_to_revoke_sorted.fasta", map="results/to_revoke.json", config="results/config.yaml", diff --git a/ingest/scripts/call_loculus.py b/ingest/scripts/call_loculus.py index 411532404..8f5ee4700 100644 --- a/ingest/scripts/call_loculus.py +++ b/ingest/scripts/call_loculus.py @@ -179,16 +179,14 @@ def post_fasta_batches( metadata_file: str, config: Config, params: dict[str, str], - chunk_size=60000, + chunk_size=1000, ) -> requests.Response: """Chunks metadata files, joins with sequences and submits each chunk via POST.""" - df = pd.read_csv(metadata_file, sep="\t") - logger.info(df.columns) - submission_ids = df["submissionId"].tolist() sequences_output_file = "results/batch_sequences.fasta" metadata_output_file = "results/batch_metadata.tsv" - def submit(metadata_output_file, sequences_output_file, batch_num): + def submit(metadata_output_file, sequences_output_file, number_of_submissions): + batch_num = -(number_of_submissions // - chunk_size) # ceiling division with ( open(metadata_output_file, "rb") as metadata_, open(sequences_output_file, "rb") as fasta_, @@ -199,68 +197,85 @@ def submit(metadata_output_file, sequences_output_file, batch_num): } response = make_request(HTTPMethod.POST, url, config, params=params, files=files) logger.info(f"Batch {batch_num} Response: {response.status_code}") + if response.status_code != 200: + logger.error(f"Error in batch {batch_num}: {response.text}") return response - def write_csv(submission_id_chunk): - metadata = df[df["submissionId"].isin(submission_id_chunk)] - metadata.to_csv(metadata_output_file, sep="\t", index=False, float_format="%.0f") + return response + + def delete_batch_files(fasta_output, metadata_output): + fasta_output.seek(0) + fasta_output.truncate() + metadata_output.seek(0) + metadata_output.truncate() - current_submission_id = None - number_of_submissions = 0 + number_of_submissions = -1 submission_id_chunk = [] + fasta_submission_id = None + fasta_header = None + with ( open(fasta_file, encoding="utf-8") as fasta_file_stream, - open(sequences_output_file, "a", encoding="utf-8") as output, + open(sequences_output_file, "a", encoding="utf-8") as fasta_output, + open(metadata_file, encoding="utf-8") as metadata_file_stream, + open(metadata_output_file, "a", encoding="utf-8") as metadata_output, ): - for line in fasta_file_stream: - if line.startswith(">"): - header = line.strip() - if config.segmented: - submission_id = "_".join(header[1:].split("_")[:-1]) - else: - submission_id = header[1:] - if submission_id == current_submission_id: - continue - if current_submission_id and submission_id < current_submission_id: - msg = "Fasta file is not sorted by submissionId" - logger.error(msg) - raise ValueError(msg) - - number_of_submissions += 1 - current_submission_id = submission_id - submission_id_chunk.append(submission_id) - if submission_id not in submission_ids: - msg = f"SubmissionId {submission_id} not found in metadata" - logger.error(msg) - raise ValueError(msg) + for record in metadata_file_stream: + number_of_submissions += 1 + metadata_output.write(record) + if number_of_submissions == 0: + # get column index of submissionId + print(record.split("\t")) + header_index = record.split("\t").index("submissionId") continue + metadata_submission_id = record.split("\t")[header_index] + + if fasta_submission_id and metadata_submission_id != fasta_submission_id: + msg = f"Fasta SubmissionId {fasta_submission_id} not in correct order in metadata" + logger.error(msg) + raise ValueError(msg) + + searching = True + + while searching: + line = fasta_file_stream.readline() + if not line: + searching = False + break + if line.startswith(">"): + header = line.strip() + fasta_header = header + if config.segmented: + submission_id = "_".join(header[1:].split("_")[:-1]) + else: + submission_id = header[1:] + if submission_id == metadata_submission_id: + continue + if submission_id < metadata_submission_id: + msg = "Fasta file is not sorted by submissionId" + logger.error(msg) + raise ValueError(msg) + + fasta_submission_id = submission_id + submission_id_chunk.append(submission_id) + searching = False + break + + # add to sequences file + fasta_output.write(fasta_header + "\n") + fasta_output.write(line) + if number_of_submissions % chunk_size == 0: - # submit sequences and metadata - batch_num = number_of_submissions // chunk_size - write_csv(submission_id_chunk) - response = submit(metadata_output_file, sequences_output_file, batch_num) - if response.status_code != 200: - logger.error(f"Error in batch {batch_num + 1}: {response.text}") - return response - - # delete the contents of sequences_output_file - output.seek(0) - output.truncate() + response = submit( + metadata_output_file, sequences_output_file, number_of_submissions + ) + delete_batch_files(fasta_output, metadata_output) submission_id_chunk = [] - # add to sequences file - output.write(header + "\n") - output.write(line + "\n") - if submission_id_chunk: # submit the last chunk - write_csv(submission_id_chunk) - batch_num = int(number_of_submissions // chunk_size) + 1 - response = submit(metadata_output_file, sequences_output_file, batch_num) - if response.status_code != 200: - logger.error(f"Error in batch {batch_num + 1}: {response.text}") - return response + response = submit(metadata_output_file, sequences_output_file, number_of_submissions) return response @@ -269,7 +284,7 @@ def submit_or_revise( metadata, sequences, config: Config, group_id, mode=Literal["submit", "revise"] ): """ - Submit/revise data to Loculus. + Submit/revise data to Loculus -requires metadata and sequences sorted by submissionId. """ logging_strings: dict[str, str] endpoint: str