Skip to content

Commit

Permalink
feat(ingest): also stream in metadata file for batch submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Jan 16, 2025
1 parent b3cab5c commit dde0e45
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 58 deletions.
24 changes: 20 additions & 4 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -588,14 +588,30 @@ rule sort_fasta:
awk '/^>/ {{if (seq) print seq; seq=""; print; next}} {{seq = seq $0}} END {{if (seq) print seq}}' {input.sequences} | \
paste - - | \
sort -k1,1 | \
tr "\t" "\n" > {output.sorted}
tr "\\t" "\\n" > {output.sorted}
"""


rule sort_metadata:
input:
metadata="results/{basename}.tsv",
output:
sorted="results/{basename}_sorted.tsv",
shell:
"""
columnNumber=$(awk -F'\\t' '{{for(i=1;i<=NF;i++) if($i=="submissionId") print i}}' {input.metadata});
if [ ${{columnNumber}} ]; then
(head -n 1 {input.metadata} && tail -n +2 {input.metadata} | sort -t$'\t' -k${{columnNumber}},${{columnNumber}}) > {output.sorted}
else
cat {input.metadata} > {output.sorted}
fi
"""


rule submit:
input:
script="scripts/call_loculus.py",
metadata="results/submit_metadata.tsv",
metadata="results/submit_metadata_sorted.tsv",
sequences="results/submit_sequences_sorted.fasta",
config="results/config.yaml",
output:
Expand All @@ -618,7 +634,7 @@ rule submit:
rule revise:
input:
script="scripts/call_loculus.py",
metadata="results/revise_metadata.tsv",
metadata="results/revise_metadata_sorted.tsv",
sequences="results/revise_sequences_sorted.fasta",
config="results/config.yaml",
output:
Expand All @@ -641,7 +657,7 @@ rule revise:
rule regroup_and_revoke:
input:
script="scripts/call_loculus.py",
metadata="results/metadata_to_submit_prior_to_revoke.tsv",
metadata="results/metadata_to_submit_prior_to_revoke_sorted.tsv",
sequences="results/sequences_to_submit_prior_to_revoke_sorted.fasta",
map="results/to_revoke.json",
config="results/config.yaml",
Expand Down
123 changes: 69 additions & 54 deletions ingest/scripts/call_loculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,14 @@ def post_fasta_batches(
metadata_file: str,
config: Config,
params: dict[str, str],
chunk_size=60000,
chunk_size=1000,
) -> requests.Response:
"""Chunks metadata files, joins with sequences and submits each chunk via POST."""
df = pd.read_csv(metadata_file, sep="\t")
logger.info(df.columns)
submission_ids = df["submissionId"].tolist()
sequences_output_file = "results/batch_sequences.fasta"
metadata_output_file = "results/batch_metadata.tsv"

def submit(metadata_output_file, sequences_output_file, batch_num):
def submit(metadata_output_file, sequences_output_file, number_of_submissions):
batch_num = -(number_of_submissions // - chunk_size) # ceiling division
with (
open(metadata_output_file, "rb") as metadata_,
open(sequences_output_file, "rb") as fasta_,
Expand All @@ -199,68 +197,85 @@ def submit(metadata_output_file, sequences_output_file, batch_num):
}
response = make_request(HTTPMethod.POST, url, config, params=params, files=files)
logger.info(f"Batch {batch_num} Response: {response.status_code}")
if response.status_code != 200:
logger.error(f"Error in batch {batch_num}: {response.text}")
return response

def write_csv(submission_id_chunk):
metadata = df[df["submissionId"].isin(submission_id_chunk)]
metadata.to_csv(metadata_output_file, sep="\t", index=False, float_format="%.0f")
return response

def delete_batch_files(fasta_output, metadata_output):
fasta_output.seek(0)
fasta_output.truncate()
metadata_output.seek(0)
metadata_output.truncate()

current_submission_id = None
number_of_submissions = 0
number_of_submissions = -1
submission_id_chunk = []
fasta_submission_id = None
fasta_header = None

with (
open(fasta_file, encoding="utf-8") as fasta_file_stream,
open(sequences_output_file, "a", encoding="utf-8") as output,
open(sequences_output_file, "a", encoding="utf-8") as fasta_output,
open(metadata_file, encoding="utf-8") as metadata_file_stream,
open(metadata_output_file, "a", encoding="utf-8") as metadata_output,
):
for line in fasta_file_stream:
if line.startswith(">"):
header = line.strip()
if config.segmented:
submission_id = "_".join(header[1:].split("_")[:-1])
else:
submission_id = header[1:]
if submission_id == current_submission_id:
continue
if current_submission_id and submission_id < current_submission_id:
msg = "Fasta file is not sorted by submissionId"
logger.error(msg)
raise ValueError(msg)

number_of_submissions += 1
current_submission_id = submission_id
submission_id_chunk.append(submission_id)
if submission_id not in submission_ids:
msg = f"SubmissionId {submission_id} not found in metadata"
logger.error(msg)
raise ValueError(msg)
for record in metadata_file_stream:
number_of_submissions += 1
metadata_output.write(record)
if number_of_submissions == 0:
# get column index of submissionId
print(record.split("\t"))
header_index = record.split("\t").index("submissionId")
continue

metadata_submission_id = record.split("\t")[header_index]

if fasta_submission_id and metadata_submission_id != fasta_submission_id:
msg = f"Fasta SubmissionId {fasta_submission_id} not in correct order in metadata"
logger.error(msg)
raise ValueError(msg)

searching = True

while searching:
line = fasta_file_stream.readline()
if not line:
searching = False
break
if line.startswith(">"):
header = line.strip()
fasta_header = header
if config.segmented:
submission_id = "_".join(header[1:].split("_")[:-1])
else:
submission_id = header[1:]
if submission_id == metadata_submission_id:
continue
if submission_id < metadata_submission_id:
msg = "Fasta file is not sorted by submissionId"
logger.error(msg)
raise ValueError(msg)

fasta_submission_id = submission_id
submission_id_chunk.append(submission_id)
searching = False
break

# add to sequences file
fasta_output.write(fasta_header + "\n")
fasta_output.write(line)

if number_of_submissions % chunk_size == 0:
# submit sequences and metadata
batch_num = number_of_submissions // chunk_size
write_csv(submission_id_chunk)
response = submit(metadata_output_file, sequences_output_file, batch_num)
if response.status_code != 200:
logger.error(f"Error in batch {batch_num + 1}: {response.text}")
return response

# delete the contents of sequences_output_file
output.seek(0)
output.truncate()
response = submit(
metadata_output_file, sequences_output_file, number_of_submissions
)
delete_batch_files(fasta_output, metadata_output)
submission_id_chunk = []

# add to sequences file
output.write(header + "\n")
output.write(line + "\n")

if submission_id_chunk:
# submit the last chunk
write_csv(submission_id_chunk)
batch_num = int(number_of_submissions // chunk_size) + 1
response = submit(metadata_output_file, sequences_output_file, batch_num)
if response.status_code != 200:
logger.error(f"Error in batch {batch_num + 1}: {response.text}")
return response
response = submit(metadata_output_file, sequences_output_file, number_of_submissions)

return response

Expand All @@ -269,7 +284,7 @@ def submit_or_revise(
metadata, sequences, config: Config, group_id, mode=Literal["submit", "revise"]
):
"""
Submit/revise data to Loculus.
Submit/revise data to Loculus -requires metadata and sequences sorted by submissionId.
"""
logging_strings: dict[str, str]
endpoint: str
Expand Down

0 comments on commit dde0e45

Please sign in to comment.