Skip to content

Commit

Permalink
Merge pull request #91 from leobruneau/main
Browse files Browse the repository at this point in the history
MSCX class API for creating score excerpts
  • Loading branch information
johentsch authored Aug 30, 2023
2 parents e385207 + 36a59f8 commit 18d16a9
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 4 deletions.
9 changes: 9 additions & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,13 @@
Contributors
============

Author
======

* Johannes Hentschel <[email protected]>

Contributors
============

* Arina Lozhkina <[email protected]>
* Léo Bruneau <[email protected]>
21 changes: 18 additions & 3 deletions src/ms3/bs4_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,8 @@ def make_excerpt(
for k, v in active_harmony_row.items()
if k.startswith("Harmony/")
}
return Excerpt(

excerpt = Excerpt(
soup,
read_only=False,
logger_cfg=self.logger_cfg,
Expand All @@ -1894,6 +1895,9 @@ def make_excerpt(
localkey=localkey,
)

excerpt.filepath = self.filepath
return excerpt

def _make_measure_list(self, sections=True, secure=True, reset_index=True):
"""Regenerate the measure list from the parsed score with advanced options."""
logger_cfg = self.logger_cfg.copy()
Expand Down Expand Up @@ -2929,7 +2933,12 @@ def __init__(
if not final_barline:
self.remove_final_barline()

# tags to amend
# sanitize values in case NaN was passed
if pd.isnull(globalkey):
globalkey = None
if pd.isnull(localkey):
localkey = None
# amend first label to indicate global and/or local key
if globalkey or localkey:
self.amend_first_harmony_keys(globalkey, localkey)

Expand Down Expand Up @@ -3048,11 +3057,17 @@ def set_clefs(self, staff2clef: Dict[int, Dict[str, str]]):
first_voice = first_measure.find("voice")
clef_tag = self.new_tag("Clef", prepend_within=first_voice)
for tag, value in tag_value_dict.items():
if pd.isnull(value):
continue
if "/" in tag:
self.logger.debug(
f"Haven't learned how to deal with secondary Clef tags such as Clef/{tag}. "
f"Igoring."
)
elif ":" in tag:
self.logger.debug(
f"Inclusion of tag attributes (such as {tag}) not yet implemented."
)
else:
_ = self.new_tag(tag, value=value, append_within=clef_tag)

Expand Down Expand Up @@ -4287,7 +4302,7 @@ def get_row_at_quarterbeat(
the given dataframe's "quarterbeat" column as activation intervals. That is, the rows are interpreted as
consecutive, non-overlapping events and the ``duration_qb`` column is not taken into account for computing the
activation intervals. The last interval's right boundary is np.inf, so that all values higher than the latest
event resolve to the latest event without needing to now the end of the piece.
event resolve to the latest event without needing to know the end of the piece.
Args:
df: DataFrame in which the column "quarterbeat" is monotonically increasing.
Expand Down
250 changes: 249 additions & 1 deletion src/ms3/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,11 @@
from tempfile import NamedTemporaryFile as Temp
from typing import IO, Collection, Literal, Optional, Tuple

import numpy as np
import pandas as pd

from .annotations import Annotations
from .bs4_parser import _MSCX_bs4
from .bs4_parser import _MSCX_bs4, get_row_at_quarterbeat
from .logger import LoggedClass, get_log_capture_handler
from .transformations import add_quarterbeats_col
from .utils import (
Expand Down Expand Up @@ -1081,6 +1082,253 @@ def _update_annotations(self, infer_types={}):
else:
self._annotations = None

def store_excerpt(
self,
start_mc: Optional[int] = None,
start_mn: Optional[int] = None,
end_mc: Optional[int] = None,
end_mn: Optional[int] = None,
directory: Optional[str] = None,
suffix: Optional[str] = None,
):
"""Store an excerpt of the current score as a new .mscx file by defining start and end measure. If no end
measure is specified, the excerpt will include everything following the start measure.
The original score header and metadata are kept. Start and end measure both can be specified either as MC
(the number in MuseScore's status bar) or as MN (the number as displayed in the score).
Args:
start_mc:
Measure count of the first measure to be included in the excerpt.
If ``start_mc`` is given, ``start_mn`` must be None.
start_mn:
Measure number of the first measure to be included in the excerpt.
If ``start_mn`` is given, ``start_mc`` must be None.
end_mc:
Measure count of the last measure to be included in the excerpt.
If ``end_mc`` is given, ``end_mn`` must be None.
end_mn:
Measure number of the last measure to be included in the excerpt.
If ``end_mn`` is given, ``end_mc`` must be None.
directory:
Path to the folder where the excerpts are to be stored.
suffix:
String to be inserted in the excerpts filename[suffix]_[start_mc]-[end_mc]
Returns:
Optional[None]: if it was impossible to find a quarterbeat value for the given start measure.
In this case the function will not produce an excerpt.
"""
assert (start_mc is None) + (
start_mn is None
) == 1, "Exactly one of start_mc or start_mn must be provided."

if end_mc is not None and end_mn is not None:
raise ValueError(
"Exactly one of end_mc or end_mn must be provided or None."
)

for arg, arg_val in zip(
("start_mc", "start_mn", "end_mc", "end_mn"),
(start_mc, start_mn, end_mc, end_mn),
):
if arg_val is not None and not isinstance(arg_val, int):
raise TypeError(
f"{arg} must be an integer. Got {arg_val!r} ({type(arg_val)!r})."
)

if suffix is None:
suffix = ""

measures = self.measures()
mc = measures["mc"]
mn = measures["mn"]

# Setting ending mc value
if end_mc is None:
if end_mn is None:
end = mc.max()
elif end_mn not in mn.values:
raise ValueError(
f"Score has no measure number {end_mn} to end an excerpt on."
)
else:
end = measures.loc[mn == end_mn, "mc"].iloc[-1]
else:
end = end_mc

# Setting starting mc value
if start_mc is None:
if start_mn not in mn.values:
raise ValueError(
f"Score has no measure number {start_mn} to start an excerpt from."
)
start = measures.loc[mn == start_mn, "mc"].iloc[0]
else:
start = start_mc

global_key, local_key = None, None
dcml_labels = self.expanded()
if dcml_labels is not None and len(dcml_labels) > 0:
# try to infer global key and local key from the annotations
mc_measures = measures.set_index("mc")
quarterbeat_start = mc_measures.loc[start, "quarterbeats"]
if pd.isnull(quarterbeat_start):
self.logger.error(
f"The given start MC {start} has no quarterbeat value and no globalkey and localkey "
f"could be inferred. Probably it is a first ending."
)
else:
row = get_row_at_quarterbeat(
df=dcml_labels, quarterbeat=quarterbeat_start
)

# TODO: Check if this is correct (sometimes get_row_at_quarterbeat returns several rows)
if isinstance(row, pd.DataFrame):
row = row.iloc[-1]

global_key = row["globalkey"]
local_key = row["localkey"]

included_mcs = tuple(range(start, end + 1))

self.logger.debug(
f"Start: {start}, End: {end}. Total number of measures: {len(included_mcs)}"
)
self.logger.debug(f"Global key: {global_key}, Local key: {local_key}")

excerpt = self.parsed.make_excerpt(
included_mcs=included_mcs, globalkey=global_key, localkey=local_key
)

original_directory, original_filename = os.path.split(excerpt.filepath)
original_file_name = os.path.splitext(original_filename)[0]
new_file_name = original_file_name + f"{suffix}_{start}-{end}" + ".mscx"
if directory is None:
excerpt_filepath = os.path.join(original_directory, new_file_name)
else:
excerpt_filepath = os.path.join(directory, new_file_name)
excerpt.store_score(excerpt_filepath)
self.logger.info(f"Excerpt for MCs {start}-{end} stored at {excerpt_filepath}.")

def store_phrase_excerpts(self, directory: Optional[str] = None):
"""Store excerpts based on the phrase annotations contained in the score, if any. For this purpose,
the :meth:`expanded` table is unfolded (to guarantee the correct sequence of phrase starts and ends),
start and end MC for each phrase are passed to :meth:`store_excerpt`. The resulting excerpts will be
named ``[original_filename]_phrase_[start_mc]-[end_mc].mscx``.
"""

expanded = self.expanded(unfold=True)
if expanded is None or len(expanded) == 0:
self.logger.info("No DCML labels found to extract phrase information from.")
return

phrase_label_mask = expanded["phraseend"].isin(["{", "}{", "}"])
if not phrase_label_mask.any():
self.logger.info(
"DCML labels do not contain phrase labels with curly brackets {}"
)
return

phrase_labels = expanded.loc[phrase_label_mask, "phraseend"]
phrase_starts = phrase_labels.str.contains("{")
phrase_ends = phrase_labels.str.contains("}")
if phrase_starts.sum() != phrase_ends.sum():
self.logger.error("Phrase labels are incoherent. Not extracting phrases.")
return
start_mcs = expanded.loc[phrase_labels[phrase_starts].index, "mc"].values
end_mcs = expanded.loc[phrase_labels[phrase_ends].index, "mc"].values
phrases = list(
reversed(sorted(zip(start_mcs, end_mcs)))
) # prepare for removal of duplicates due to unfolding
self.logger.debug(f"Found {len(phrases)} phrases.", f"Phrases: {phrases}")
phrases_without_duplicates = []
previous_start = None
for start_mc, end_mc in phrases:
if start_mc == previous_start:
# do not create excerpts with the same start_mc
continue
if end_mc < start_mc:
self.logger.error(
f"Phrase end {end_mc} is smaller than phrase start {start_mc}, skipping excerpt."
)
continue
phrases_without_duplicates.append((start_mc, end_mc))
self.store_excerpt(
start_mc=int(start_mc),
end_mc=int(end_mc),
directory=directory,
suffix="_phrase",
)
previous_start = start_mc

self.logger.info(
f"Extracted {len(phrases_without_duplicates)} phrases.\n"
f"Phrases: {phrases_without_duplicates}"
)

def store_random_excerpts(
self,
n_excerpts: Optional[int] = None,
mn_length: int = 2,
directory: Optional[str] = None,
suffix: Optional[str] = None,
):
"""Extract random snippets from the given score. The function will generate a list of tuples,
where in each pair, the first element is the mc for the snippet
beginning and the second will be the mc for the snippet ending.
For each of these pairs, the function will call make_excerpt() to generate an excerpt for the snippet.
Args:
n_excerpts:
Number of snippets to be extracted. If not specified, all possible snippets will be extracted.
mn_length:
Length of each snippet in terms of MN (measure numbers).
directory:
Path to the folder where the excerpts are to be stored.
suffix:
String to be inserted in the excerpts filename[suffix]_[start_mc]-[end_mc]
"""

if n_excerpts is not None and not isinstance(n_excerpts, int):
raise TypeError("snippet_number must be an integer.")
if not isinstance(mn_length, int):
raise TypeError(
"snippet_length must be an integer. Cannot create snippet of non-integral length."
)

last_mn = self.measures().mn.max()
if mn_length > last_mn:
raise ValueError(
f"mn_length ({mn_length}) exceeds the number of measures in the score ({last_mn})."
)
last_possible_start = last_mn - mn_length + 1
if n_excerpts is None:
n_excerpts = last_possible_start
self.logger.debug(
f"Number of snippets not specified. Extracting all {n_excerpts} possible snippets."
)
elif n_excerpts > last_possible_start:
n_excerpts = last_possible_start
self.logger.info(
"Number of snippets exceeds the number of possible snippets. ",
"Will extract all possible snippets.",
)

valid_mn_starts = np.arange(
1, last_possible_start + 1
) # systematically excludes anacrusis (MN=0)
sampled_mn_starts = np.random.choice(valid_mn_starts, n_excerpts, replace=False)
self.logger.debug(f"Sampled starting points: {sampled_mn_starts}")

for mn_start in sampled_mn_starts:
self.store_excerpt(
start_mn=int(mn_start),
end_mn=int(mn_start + mn_length - 1),
directory=directory,
suffix=suffix,
)


# ######################################################################################################################
# ######################################################################################################################
Expand Down
42 changes: 42 additions & 0 deletions tests/test_local_files/test_score_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ms3.utils import (
assert_all_lines_equal,
assert_dfs_equal,
check_phrase_annotations,
decode_harmonies,
load_tsv,
no_collections_no_booleans,
Expand Down Expand Up @@ -174,3 +175,44 @@ def test_parse_to_notelist(self, score_object):
assert_dfs_equal(target_notelist, new_notelist)
finally:
os.remove(tmp_file.name)

def test_excerpt(self, score_object, tmp_path):
print(f"CREATING EXCERPTS IN {tmp_path}")
for start, end in ((1, 3), (2, 2), (3, None)):
score_object.mscx.store_excerpt(
start_mc=start,
end_mc=end,
directory=str(tmp_path),
)
assert len(os.listdir(tmp_path)) == 3

def test_phrase_excerpts(self, score_object, tmp_path):
print(f"CREATING PHRASE EXCERPTS IN {tmp_path}")
dcml_labels = score_object.mscx.expanded(unfold=True)
if not dcml_labels:
pytest.skip("No labels to extract phrases from.")
if not check_phrase_annotations(dcml_labels, "phraseend"):
pytest.skip("Incongruent phrase annotations.")
score_object.mscx.store_phrase_excerpts(
directory=str(tmp_path),
)
if score_object.mscx.has_annotations:
assert len(os.listdir(tmp_path)) > 0

def test_random_excerpts(self, score_object, tmp_path):
print(f"CREATING RANDOM EXCERPTS IN {tmp_path}")
score_object.mscx.store_random_excerpts(
n_excerpts=3,
directory=str(tmp_path),
)
assert len(os.listdir(tmp_path)) == 3

def test_storing_all_excerpts(self, score_object, tmp_path):
print(f"CREATING RANDOM EXCERPTS IN {tmp_path}")
last_mn = score_object.mscx.measures().mn.max()
mn_length = last_mn - 2
score_object.mscx.store_random_excerpts(
mn_length=int(mn_length),
directory=str(tmp_path),
)
assert len(os.listdir(tmp_path)) == 3

0 comments on commit 18d16a9

Please sign in to comment.