From a10e4273ba8a7a44986ff077a9f0193128719dfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nahuel=20Unai=20Rosell=C3=B3=20Beneitez?= Date: Thu, 26 Sep 2024 10:16:03 +0000 Subject: [PATCH] Remove V2 implementation, add `get_{recording,segment}_mapping` --- lib/corpus.py | 254 +++----------------------------------------------- 1 file changed, 11 insertions(+), 243 deletions(-) diff --git a/lib/corpus.py b/lib/corpus.py index aa05c05e..80442d70 100644 --- a/lib/corpus.py +++ b/lib/corpus.py @@ -320,213 +320,17 @@ def _dump_internal(self, out: TextIO, indentation: str = ""): else: out.write("%s\n" % (indentation,)) - -class CorpusV2(NamedEntity, CorpusSection): - """ - This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus - attribute is set. Corpora with include statements can be read but are written back as a single file. - - The difference with respect to :class:`i6_core.lib.corpus.Corpus` is that in this class we can access - any recording or segment in practically O(1). - """ - - def __init__(self): - super().__init__() - - self.parent_corpus: Optional[CorpusV2] = None - - self.subcorpora: Dict[str, CorpusV2] = {} - self.recordings: Dict[str, RecordingV2] = {} - - def segments(self) -> Iterable[Segment]: - """ - :return: an iterator over all segments within the corpus - """ - for r in self.recordings: - yield from r.segments.values() - for sc in self.subcorpora.values(): - yield from sc.segments() - - def segment_map(self) -> Dict[str, Segment]: - """ - :return: A mapping from full segment names into the actual segments. - Note that this is similar to what the function :func:`get_segment_by_full_name` does, - but giving more control to the user. - """ - seg_map = {seg.fullname(): seg for rec in self.recordings.values() for seg in rec.segments.values()} - for sc in self.subcorpora.values(): - seg_map.update(sc.segment_map()) - - return seg_map - - def get_segment_by_full_name(self, name: str) -> Optional[Segment]: - """ - Obtains a segment from the corpus given its full name in the corpus. - - :param name: The full name of the segment. - :return: The segment to be searched for, or `None` if not found. - """ - split_segment_name = name.split("/") - potential_corpus_name = split_segment_name[0] - if self.name == potential_corpus_name: - # The name was the own corpus'. This can happen the first iteration when giving the full segment name, - # for instance 'base_corpus/subcorpus1/subcorpus2/recording/segment'. - # Get rid of the first part for the search. - name = split_segment_name[1:] - - return self._get_segment_by_relative_name(name) - - def _get_segment_by_relative_name(self, name: str) -> Segment: - """ - :param name: The name of the segment to be searched for, relative to the current corpus. - :return: The segment whose name coincides with :param:`name` relative to the current corpus, - or `None` if not found. - """ - if name == "": - # Found nothing. - return None - - # Base case: the recording name comes first, and the segment name appears immediately afterwards. - recording_name = name.split("/")[0] - segment_name = name[len(f"{recording_name}/") :] - if recording_name in self.recordings: - return self.recordings[recording_name][segment_name] - else: - # Recursive case: look one level deeper to the indicated subcorpus. - subcorpus_name = recording_name - segment_full_name_from_subcorpus = segment_name - assert subcorpus_name in self.subcorpora, ( - f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' " - "not found in the list of subcorpora: {list(self.subcorpora.keys())}." - ) - return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_full_name_from_subcorpus) - - def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]: - """ - :return: the segment specified by its full name - """ - if name == "": - # Found nothing. - return None - - if name in self.recordings: - return self.recordings[name] - else: - subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - if self.name == subcorpus_name: - # The name was the own corpus'. This can happen when giving the full recording name. - # Ignore the former part. - subcorpus_name = name.split("/")[0] - recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :] - - assert subcorpus_name in self.subcorpora, ( - f"Subcorpus '{subcorpus_name}' required for accessing recording '{name}' " - "not found in the list of subcorpora: {list(self.subcorpora.keys())}." - ) - return self.subcorpora[subcorpus_name].get_recording_by_full_name(recording_name_from_subcorpus) - - def all_recordings(self) -> Iterable[RecordingV2]: - yield from self.recordings.values() - for sc in self.subcorpora.values(): - yield from sc.all_recordings() - - def all_speakers(self) -> Iterable[Speaker]: - yield from self.speakers.values() - for sc in self.subcorpora.values(): - yield from sc.all_speakers() - - def top_level_recordings(self) -> Iterable[RecordingV2]: - yield from self.recordings.values() - - def top_level_subcorpora(self) -> Iterable[CorpusV2]: - yield from self.subcorpora.values() - - def top_level_speakers(self) -> Iterable[Speaker]: - yield from self.speakers.values() - - def remove_recording(self, recording: RecordingV2): - del self.recordings[recording.name] - for sc in self.subcorpora.values(): - sc.remove_recording(recording) - - def remove_recordings(self, recordings: List[RecordingV2]): - for recording in recordings: - del self.recordings[recording.name] - for sc in self.subcorpora: - sc.remove_recordings(recordings) - - def add_recording(self, recording: RecordingV2): - assert isinstance(recording, RecordingV2) - recording.corpus = self - self.recordings[recording.name] = recording - - def add_subcorpus(self, corpus: CorpusV2): - assert isinstance(corpus, Corpus) - corpus.parent_corpus = self - self.subcorpora[corpus.name] = corpus - - def add_speaker(self, speaker: Speaker): - assert isinstance(speaker, Speaker) - self.speakers[speaker.name] = speaker - - def fullname(self) -> str: - if self.parent_corpus is not None: - return self.parent_corpus.fullname() + "/" + self.name - else: - return self.name - - def filter_segments(self, filter_function: FilterFunction): - """ - filter all segments (including in subcorpora) using filter_function - :param filter_function: takes arguments corpus, recording and segment, returns True if segment should be kept - """ - for r in self.recordings.values(): - r.segments = [s for s in r.segments.values() if filter_function(self, r, s)] - for sc in self.subcorpora.values(): - sc.filter_segments(filter_function) - - def load(self, path: str): + def get_segment_mapping(self) -> Dict[str, Segment]: """ - :param path: corpus .xml or .xml.gz + :return: Mapping from segment fullnames to actual segments. """ - open_fun = gzip.open if path.endswith(".gz") else open - - with open_fun(path, "rt") as f: - handler = CorpusParser(self, path) - sax.parse(f, handler) + return {seg.fullname(): seg for seg in self.segments()} - def dump(self, path: str): + def get_recording_mapping(self) -> Dict[str, Recording]: """ - :param path: target .xml or .xml.gz path + :return: Mapping from recording fullnames to actual recordings. """ - open_fun = gzip.open if path.endswith(".gz") else open - - with open_fun(path, "wt") as f: - f.write('\n') - self._dump_internal(f) - - def _dump_internal(self, out: TextIO, indentation: str = ""): - if self.parent_corpus is None: - out.write('\n' % self.name) - else: - out.write('%s\n' % (indentation, self.name)) - - for s in self.speakers.values(): - s.dump(out, indentation + " ") - if self.speaker_name is not None: - out.write('%s \n' % (indentation, self.speaker_name)) - - for r in self.recordings.values(): - r.dump(out, indentation + " ") - - for sc in self.subcorpora.values(): - sc._dump_internal(out, indentation + " ") - - if self.parent_corpus is None: - out.write("\n") - else: - out.write("%s\n" % (indentation,)) + return {rec.fullname(): rec for rec in self.recordings} class Recording(NamedEntity, CorpusSection): @@ -565,47 +369,11 @@ def add_segment(self, segment: Segment): segment.recording = self self.segments.append(segment) - -class RecordingV2(NamedEntity, CorpusSection): - """ - Represents a recording, which is an entity composed of an audio file and a set of segments. - - The difference with respect to :class:`i6_core.lib.corpus.Recording` is that this class allows access - from the parent corpus to any segment in O(1). - """ - - def __init__(self): - super().__init__() - self.audio: Optional[str] = None - self.corpus: Optional[CorpusV2] = None - self.segments: Dict[str, Segment] = {} - - def fullname(self) -> str: - return self.corpus.fullname() + "/" + self.name - - def speaker(self, speaker_name: Optional[str] = None) -> Speaker: - if speaker_name is None: - speaker_name = self.speaker_name - if speaker_name in self.speakers: - return self.speakers[speaker_name] - else: - return self.corpus.speaker(speaker_name, self.default_speaker) - - def dump(self, out: TextIO, indentation: str = ""): - out.write('%s\n' % (indentation, self.name, self.audio)) - - for s in self.speakers.values(): - s.dump(out, indentation + " ") - if self.speaker_name is not None: - out.write('%s \n' % (indentation, self.speaker_name)) - - for s in self.segments.values(): - s.dump(out, indentation + " ") - - def add_segment(self, segment: Segment): - assert isinstance(segment, Segment) - segment.recording = self - self.segments[segment.name] = segment + def get_segment_mapping(self) -> Dict[str, Segment]: + """ + :return: Mapping from segment fullnames to actual segments. + """ + return {seg.fullname(): seg for seg in self.segments} class Segment(NamedEntity):