Skip to content

Commit

Permalink
update of tuple type
Browse files Browse the repository at this point in the history
  • Loading branch information
mattheww95 committed Oct 22, 2024
1 parent 6573fcf commit bd2f7d9
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/mikrokondo_tools/samplesheet/samplesheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def organize_data(self) -> t.Dict[str, t.List[SampleRow]]:
self.update_sample_sheet_se(sample_sheet, assemblies.items(), SampleRow.assembly_key())
return sample_sheet

def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], items: t.Iterable[tuple[str, list]], field: str):
def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], items: t.Iterable[t.Tuple[str, list]], field: str):
for k, v in items:
existing_data = sample_sheet.get(k)
if existing_data:
Expand All @@ -213,11 +213,11 @@ def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], i
for ngs_data in v:
sample_sheet[k].append(SampleRow(sample=k, **{field: ngs_data}))

def get_ngs_data(self) -> tuple[t.Optional[t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]], t.Optional[t.Dict[str, t.List[p.Path]]], t.Optional[t.Dict[str, t.List[p.Path]]]]:
def get_ngs_data(self) -> t.Tuple[t.Optional[t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]], t.Optional[t.Dict[str, t.List[p.Path]]], t.Optional[t.Dict[str, t.List[p.Path]]]]:
"""
consolidate aggregate data into one data structure that can be validated
"""
pe_reads: t.Optional[t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]] = None
pe_reads: t.Optional[t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]] = None
se_reads: t.Optional[t.Dict[str, t.List[p.Path]]] = None
fastas: t.Optional[t.Dict[str, t.List[p.Path]]] = None

Expand All @@ -230,11 +230,11 @@ def get_ngs_data(self) -> tuple[t.Optional[t.Dict[str, tuple[t.List[p.Path], t.L
logger.error("No input files found for processing.")
return (pe_reads, se_reads, fastas)

def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]:
def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]:
"""
Group the reads into bins of paired and unpaired reads
"""
r1_reads: t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]] = dict()
r1_reads: t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]] = dict()

for r in reads :
if not r.match(f"**/*{self.extension_r1}*"):
Expand All @@ -245,7 +245,7 @@ def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, tuple[t.List[p.
else:
r1_reads[sample_name] = ([r], [])

r2_reads: t.List[tuple[str, p.Path]] = [(r.name[:r.name.rfind(self.extension_r2)], r) for r in reads if r.match(f"**/*{self.extension_r2}*")]
r2_reads: t.List[t.Tuple[str, p.Path]] = [(r.name[:r.name.rfind(self.extension_r2)], r) for r in reads if r.match(f"**/*{self.extension_r2}*")]
for r2 in r2_reads:
if r1 := r1_reads.get(r2[0]):
r1[1].append(r2[1])
Expand Down Expand Up @@ -274,7 +274,7 @@ def get_schema_input(url: str) -> json:
return u.download_json(url, logger)


def get_samples(directory: p.Path) -> tuple[t.List[p.Path], t.List[p.Path]]:
def get_samples(directory: p.Path) -> t.Tuple[t.List[p.Path], t.List[p.Path]]:
"""
Gather all sample information into one place for usage.
Expand Down

0 comments on commit bd2f7d9

Please sign in to comment.