Skip to content

Commit

Permalink
Remove V2 implementation, add get_{recording,segment}_mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
Icemole committed Sep 26, 2024
1 parent 81582f4 commit a10e427
Showing 1 changed file with 11 additions and 243 deletions.
254 changes: 11 additions & 243 deletions lib/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,213 +320,17 @@ def _dump_internal(self, out: TextIO, indentation: str = ""):
else:
out.write("%s</subcorpus>\n" % (indentation,))


class CorpusV2(NamedEntity, CorpusSection):
"""
This class represents a corpus in the Bliss format. It is also used to represent subcorpora when the parent_corpus
attribute is set. Corpora with include statements can be read but are written back as a single file.
The difference with respect to :class:`i6_core.lib.corpus.Corpus` is that in this class we can access
any recording or segment in practically O(1).
"""

def __init__(self):
super().__init__()

self.parent_corpus: Optional[CorpusV2] = None

self.subcorpora: Dict[str, CorpusV2] = {}
self.recordings: Dict[str, RecordingV2] = {}

def segments(self) -> Iterable[Segment]:
"""
:return: an iterator over all segments within the corpus
"""
for r in self.recordings:
yield from r.segments.values()
for sc in self.subcorpora.values():
yield from sc.segments()

def segment_map(self) -> Dict[str, Segment]:
"""
:return: A mapping from full segment names into the actual segments.
Note that this is similar to what the function :func:`get_segment_by_full_name` does,
but giving more control to the user.
"""
seg_map = {seg.fullname(): seg for rec in self.recordings.values() for seg in rec.segments.values()}
for sc in self.subcorpora.values():
seg_map.update(sc.segment_map())

return seg_map

def get_segment_by_full_name(self, name: str) -> Optional[Segment]:
"""
Obtains a segment from the corpus given its full name in the corpus.
:param name: The full name of the segment.
:return: The segment to be searched for, or `None` if not found.
"""
split_segment_name = name.split("/")
potential_corpus_name = split_segment_name[0]
if self.name == potential_corpus_name:
# The name was the own corpus'. This can happen the first iteration when giving the full segment name,
# for instance 'base_corpus/subcorpus1/subcorpus2/recording/segment'.
# Get rid of the first part for the search.
name = split_segment_name[1:]

return self._get_segment_by_relative_name(name)

def _get_segment_by_relative_name(self, name: str) -> Segment:
"""
:param name: The name of the segment to be searched for, relative to the current corpus.
:return: The segment whose name coincides with :param:`name` relative to the current corpus,
or `None` if not found.
"""
if name == "":
# Found nothing.
return None

# Base case: the recording name comes first, and the segment name appears immediately afterwards.
recording_name = name.split("/")[0]
segment_name = name[len(f"{recording_name}/") :]
if recording_name in self.recordings:
return self.recordings[recording_name][segment_name]
else:
# Recursive case: look one level deeper to the indicated subcorpus.
subcorpus_name = recording_name
segment_full_name_from_subcorpus = segment_name
assert subcorpus_name in self.subcorpora, (
f"Subcorpus '{subcorpus_name}' required for accessing segment '{name}' "
"not found in the list of subcorpora: {list(self.subcorpora.keys())}."
)
return self.subcorpora[subcorpus_name].get_segment_by_full_name(segment_full_name_from_subcorpus)

def get_recording_by_full_name(self, name: str) -> Optional[RecordingV2]:
"""
:return: the segment specified by its full name
"""
if name == "":
# Found nothing.
return None

if name in self.recordings:
return self.recordings[name]
else:
subcorpus_name = name.split("/")[0]
recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :]
if self.name == subcorpus_name:
# The name was the own corpus'. This can happen when giving the full recording name.
# Ignore the former part.
subcorpus_name = name.split("/")[0]
recording_name_from_subcorpus = name[len(f"{subcorpus_name}/") :]

assert subcorpus_name in self.subcorpora, (
f"Subcorpus '{subcorpus_name}' required for accessing recording '{name}' "
"not found in the list of subcorpora: {list(self.subcorpora.keys())}."
)
return self.subcorpora[subcorpus_name].get_recording_by_full_name(recording_name_from_subcorpus)

def all_recordings(self) -> Iterable[RecordingV2]:
yield from self.recordings.values()
for sc in self.subcorpora.values():
yield from sc.all_recordings()

def all_speakers(self) -> Iterable[Speaker]:
yield from self.speakers.values()
for sc in self.subcorpora.values():
yield from sc.all_speakers()

def top_level_recordings(self) -> Iterable[RecordingV2]:
yield from self.recordings.values()

def top_level_subcorpora(self) -> Iterable[CorpusV2]:
yield from self.subcorpora.values()

def top_level_speakers(self) -> Iterable[Speaker]:
yield from self.speakers.values()

def remove_recording(self, recording: RecordingV2):
del self.recordings[recording.name]
for sc in self.subcorpora.values():
sc.remove_recording(recording)

def remove_recordings(self, recordings: List[RecordingV2]):
for recording in recordings:
del self.recordings[recording.name]
for sc in self.subcorpora:
sc.remove_recordings(recordings)

def add_recording(self, recording: RecordingV2):
assert isinstance(recording, RecordingV2)
recording.corpus = self
self.recordings[recording.name] = recording

def add_subcorpus(self, corpus: CorpusV2):
assert isinstance(corpus, Corpus)
corpus.parent_corpus = self
self.subcorpora[corpus.name] = corpus

def add_speaker(self, speaker: Speaker):
assert isinstance(speaker, Speaker)
self.speakers[speaker.name] = speaker

def fullname(self) -> str:
if self.parent_corpus is not None:
return self.parent_corpus.fullname() + "/" + self.name
else:
return self.name

def filter_segments(self, filter_function: FilterFunction):
"""
filter all segments (including in subcorpora) using filter_function
:param filter_function: takes arguments corpus, recording and segment, returns True if segment should be kept
"""
for r in self.recordings.values():
r.segments = [s for s in r.segments.values() if filter_function(self, r, s)]
for sc in self.subcorpora.values():
sc.filter_segments(filter_function)

def load(self, path: str):
def get_segment_mapping(self) -> Dict[str, Segment]:
"""
:param path: corpus .xml or .xml.gz
:return: Mapping from segment fullnames to actual segments.
"""
open_fun = gzip.open if path.endswith(".gz") else open

with open_fun(path, "rt") as f:
handler = CorpusParser(self, path)
sax.parse(f, handler)
return {seg.fullname(): seg for seg in self.segments()}

def dump(self, path: str):
def get_recording_mapping(self) -> Dict[str, Recording]:
"""
:param path: target .xml or .xml.gz path
:return: Mapping from recording fullnames to actual recordings.
"""
open_fun = gzip.open if path.endswith(".gz") else open

with open_fun(path, "wt") as f:
f.write('<?xml version="1.0" encoding="utf-8"?>\n')
self._dump_internal(f)

def _dump_internal(self, out: TextIO, indentation: str = ""):
if self.parent_corpus is None:
out.write('<corpus name="%s">\n' % self.name)
else:
out.write('%s<subcorpus name="%s">\n' % (indentation, self.name))

for s in self.speakers.values():
s.dump(out, indentation + " ")
if self.speaker_name is not None:
out.write('%s <speaker name="%s"/>\n' % (indentation, self.speaker_name))

for r in self.recordings.values():
r.dump(out, indentation + " ")

for sc in self.subcorpora.values():
sc._dump_internal(out, indentation + " ")

if self.parent_corpus is None:
out.write("</corpus>\n")
else:
out.write("%s</subcorpus>\n" % (indentation,))
return {rec.fullname(): rec for rec in self.recordings}


class Recording(NamedEntity, CorpusSection):
Expand Down Expand Up @@ -565,47 +369,11 @@ def add_segment(self, segment: Segment):
segment.recording = self
self.segments.append(segment)


class RecordingV2(NamedEntity, CorpusSection):
"""
Represents a recording, which is an entity composed of an audio file and a set of segments.
The difference with respect to :class:`i6_core.lib.corpus.Recording` is that this class allows access
from the parent corpus to any segment in O(1).
"""

def __init__(self):
super().__init__()
self.audio: Optional[str] = None
self.corpus: Optional[CorpusV2] = None
self.segments: Dict[str, Segment] = {}

def fullname(self) -> str:
return self.corpus.fullname() + "/" + self.name

def speaker(self, speaker_name: Optional[str] = None) -> Speaker:
if speaker_name is None:
speaker_name = self.speaker_name
if speaker_name in self.speakers:
return self.speakers[speaker_name]
else:
return self.corpus.speaker(speaker_name, self.default_speaker)

def dump(self, out: TextIO, indentation: str = ""):
out.write('%s<recording name="%s" audio="%s">\n' % (indentation, self.name, self.audio))

for s in self.speakers.values():
s.dump(out, indentation + " ")
if self.speaker_name is not None:
out.write('%s <speaker name="%s"/>\n' % (indentation, self.speaker_name))

for s in self.segments.values():
s.dump(out, indentation + " ")

def add_segment(self, segment: Segment):
assert isinstance(segment, Segment)
segment.recording = self
self.segments[segment.name] = segment
def get_segment_mapping(self) -> Dict[str, Segment]:
"""
:return: Mapping from segment fullnames to actual segments.
"""
return {seg.fullname(): seg for seg in self.segments}


class Segment(NamedEntity):
Expand Down

0 comments on commit a10e427

Please sign in to comment.