diff --git a/src/mikrokondo_tools/samplesheet/samplesheet.py b/src/mikrokondo_tools/samplesheet/samplesheet.py index 51c3d97..1c703e4 100644 --- a/src/mikrokondo_tools/samplesheet/samplesheet.py +++ b/src/mikrokondo_tools/samplesheet/samplesheet.py @@ -198,7 +198,7 @@ def organize_data(self) -> t.Dict[str, t.List[SampleRow]]: self.update_sample_sheet_se(sample_sheet, assemblies.items(), SampleRow.assembly_key()) return sample_sheet - def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], items: t.Iterable[tuple[str, list]], field: str): + def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], items: t.Iterable[t.Tuple[str, list]], field: str): for k, v in items: existing_data = sample_sheet.get(k) if existing_data: @@ -213,11 +213,11 @@ def update_sample_sheet_se(self, sample_sheet: t.Dict[str, t.List[SampleRow]], i for ngs_data in v: sample_sheet[k].append(SampleRow(sample=k, **{field: ngs_data})) - def get_ngs_data(self) -> tuple[t.Optional[t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]], t.Optional[t.Dict[str, t.List[p.Path]]], t.Optional[t.Dict[str, t.List[p.Path]]]]: + def get_ngs_data(self) -> t.Tuple[t.Optional[t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]], t.Optional[t.Dict[str, t.List[p.Path]]], t.Optional[t.Dict[str, t.List[p.Path]]]]: """ consolidate aggregate data into one data structure that can be validated """ - pe_reads: t.Optional[t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]] = None + pe_reads: t.Optional[t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]] = None se_reads: t.Optional[t.Dict[str, t.List[p.Path]]] = None fastas: t.Optional[t.Dict[str, t.List[p.Path]]] = None @@ -230,11 +230,11 @@ def get_ngs_data(self) -> tuple[t.Optional[t.Dict[str, tuple[t.List[p.Path], t.L logger.error("No input files found for processing.") return (pe_reads, se_reads, fastas) - def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]]: + def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]]: """ Group the reads into bins of paired and unpaired reads """ - r1_reads: t.Dict[str, tuple[t.List[p.Path], t.List[p.Path]]] = dict() + r1_reads: t.Dict[str, t.Tuple[t.List[p.Path], t.List[p.Path]]] = dict() for r in reads : if not r.match(f"**/*{self.extension_r1}*"): @@ -245,7 +245,7 @@ def get_paired_reads(self, reads: t.List[p.Path]) -> t.Dict[str, tuple[t.List[p. else: r1_reads[sample_name] = ([r], []) - r2_reads: t.List[tuple[str, p.Path]] = [(r.name[:r.name.rfind(self.extension_r2)], r) for r in reads if r.match(f"**/*{self.extension_r2}*")] + r2_reads: t.List[t.Tuple[str, p.Path]] = [(r.name[:r.name.rfind(self.extension_r2)], r) for r in reads if r.match(f"**/*{self.extension_r2}*")] for r2 in r2_reads: if r1 := r1_reads.get(r2[0]): r1[1].append(r2[1]) @@ -274,7 +274,7 @@ def get_schema_input(url: str) -> json: return u.download_json(url, logger) -def get_samples(directory: p.Path) -> tuple[t.List[p.Path], t.List[p.Path]]: +def get_samples(directory: p.Path) -> t.Tuple[t.List[p.Path], t.List[p.Path]]: """ Gather all sample information into one place for usage.