Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudz committed Feb 27, 2024
1 parent de32500 commit 6a426bd
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 77 deletions.
167 changes: 91 additions & 76 deletions atomisticparsers/gromacs/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@
MOL = 6.022140857e23


def to_float(string):
try:
value = float(string)
except ValueError:
value = None
return value


class GromacsLogParser(TextParser):
def __init__(self):
super().__init__(None)
Expand Down Expand Up @@ -114,14 +122,14 @@ def str_to_energies(val_in):
while pointer < len(rows[n]):
key = rows[n][pointer : pointer + n_chars_val].strip()
value = rows[n + 1][pointer : pointer + n_chars_val]
energies[key] = float(value)
energies[key] = to_float(value)
pointer += n_chars_val
return energies

def str_to_step_info(val_in):
val = val_in.strip().splitlines()
keys = val[0].split()
values = [float(v) for v in val[1].split()]
values = [to_float(v) for v in val[1].split()]
return {key: values[n] for n, key in enumerate(keys)}

thermo_quantities = [
Expand Down Expand Up @@ -191,8 +199,7 @@ def str_to_input_parameters(val_in):
re_array = re.compile(r"\s*([\w\-]+)\[[\d ]+\]\s*=\s*\{*(.+)")
re_scalar = re.compile(r"\s*([\w\-]+)\s*[=:]\s*(.+)")
parameters = dict()
val = val_in.splitlines()
val = [line.strip() for line in val]
val = [line.strip() for line in val_in.splitlines()]
for val_n in val:
val_scalar = re_scalar.match(val_n)
if val_scalar:
Expand All @@ -202,7 +209,7 @@ def str_to_input_parameters(val_in):
if val_array:
parameters.setdefault(val_array.group(1), [])
value = [
float(v) for v in val_array.group(2).rstrip("}").split(",")
to_float(v) for v in val_array.group(2).rstrip("}").split(",")
]
parameters[val_array.group(1)].append(
value[0] if len(value) == 1 else value
Expand All @@ -221,6 +228,10 @@ def str_to_input_parameters(val_in):
class GromacsXvgParser(TextParser):
def __init__(self):
super().__init__(None)
self.re_columns = re.compile(r"@\s*s\d{1,2}\s*legend\s*\".*\"")
self.re_comment = re.compile(r"^[@#]")
self.re_quotes = re.compile(r"\"(.*)\"")
self.re_label = re.compile(r'@\s*(title|xaxis|yaxis)\s*(?: label)?\s*"(.*)"')

def str_to_results(val_in):
results = {
Expand All @@ -230,35 +241,19 @@ def str_to_results(val_in):
"yaxis": "",
"column_headers": [],
}
re_columns = re.compile(r"@\s*s\d{1,2}\s*legend\s*\".*\"")
re_title = re.compile(r"@\s*title.*")
re_xaxis = re.compile(r"@\s*xaxis.*")
re_yaxis = re.compile(r"@\s*yaxis.*")
re_comment = re.compile(r"^[@#]")
re_quotes = re.compile(r"\"(.*)\"")
val = val_in.strip().split("\n")

val = val_in.strip().splitlines()
val = [line.strip() for line in val]
for val_n in val:
val_title = re_yaxis.match(val_n)
val_xaxis = re_title.match(val_n)
val_yaxis = re_xaxis.match(val_n)
val_legend = re_columns.match(val_n)
val_comment = re_comment.match(val_n)
if val_title:
title = val_title.group()
title = re_quotes.findall(title)
results["title"] = title[0] if title else None
elif val_xaxis:
xaxis = val_xaxis.group()
xaxis = re_quotes.findall(xaxis)
results["xaxis"] = xaxis[0] if xaxis else None
elif val_yaxis:
yaxis = val_yaxis.group()
yaxis = re_quotes.findall(yaxis)
results["yaxis"] = xaxis[0] if xaxis else None
val_label = self.re_label.match(val_n)
val_legend = self.re_columns.match(val_n)
val_comment = self.re_comment.match(val_n)
if val_label:
key, label = val_label.groups()
results[key] = label
elif val_legend: # TODO convert out of xmgrace notation
column = val_legend.group()
column = re_quotes.findall(column)
column = self.re_quotes.findall(column)
column = column[0] if column else None
results["column_headers"].append(column)
elif not val_comment:
Expand Down Expand Up @@ -635,6 +630,8 @@ def __init__(self):
self.traj_parser = GromacsMDAnalysisParser()
self.energy_parser = GromacsEDRParser()
self.mdp_parser = GromacsMdpParser()
self.mdp_ext = "mdp"
self.mdp_std_filename = "mdout"
self.xvg_parser = GromacsXvgParser()
self.input_parameters = {}
self._gro_energy_units = ureg.kilojoule * MOL
Expand Down Expand Up @@ -687,9 +684,7 @@ def get_mdp_file(self):
3. input mdp file matching the mainfile name (as usual)
4. any `.mdp` file within the directory (as usual)
"""
ext = "mdp"
std_filename = "mdout"
files = [d for d in self._gromacs_files if d.endswith(ext)]
files = [d for d in self._gromacs_files if d.endswith(self.mdp_ext)]

if len(files) == 0:
return ""
Expand All @@ -699,15 +694,15 @@ def get_mdp_file(self):

for f in files:
filename = f.rsplit(".", 1)[0]
if self._basename in filename and std_filename in filename:
if self._basename in filename and self.mdp_std_filename in filename:
return os.path.join(self._maindir, f)

for f in files:
filename = f.rsplit(".", 1)[0]
if std_filename in filename:
if self.mdp_std_filename in filename:
return os.path.join(self._maindir, f)

return self.get_gromacs_file(ext)
return self.get_gromacs_file(self.mdp_ext)

def get_gromacs_file(self, ext):
files = [d for d in self._gromacs_files if d.endswith(ext)]
Expand Down Expand Up @@ -1080,11 +1075,11 @@ def parse_method(self):
)
rlist = input_parameters.get("rlist", None)
sec_neighbor_searching.neighbor_update_cutoff = (
float(rlist) * ureg.nanometer if rlist else None
to_float(rlist) * ureg.nanometer if to_float(rlist) else None
)
rvdw = input_parameters.get("rvdw", None)
sec_force_calculations.vdw_cutoff = (
float(rvdw) * ureg.nanometer if rvdw else None
to_float(rvdw) * ureg.nanometer if to_float(rvdw) else None
)
coulombtype = input_parameters.get("coulombtype", "no").lower()
coulombtype_map = {
Expand All @@ -1110,7 +1105,9 @@ def parse_method(self):
)
sec_force_calculations.coulomb_type = value
rcoulomb = input_parameters.get("rcoulomb", None)
sec_force_calculations.coulomb_cutoff = float(rcoulomb) if rcoulomb else None
sec_force_calculations.coulomb_cutoff = (
to_float(rcoulomb) if to_float(rcoulomb) else None
)

def get_thermostat_parameters(self, integrator: str = ""):
thermostat = self.input_parameters.get("tcoupl", "no").lower()
Expand Down Expand Up @@ -1138,12 +1135,16 @@ def get_thermostat_parameters(self, integrator: str = ""):
if thermostat_parameters["thermostat_type"]:
reference_temperature = self.input_parameters.get("ref-t", None)
if isinstance(reference_temperature, str):
reference_temperature = float(reference_temperature.split()[0])
reference_temperature = to_float(
reference_temperature.split()[0]
) # ! simulated annealing protocols not supported
reference_temperature *= ureg.kelvin if reference_temperature else None
thermostat_parameters["reference_temperature"] = reference_temperature
coupling_constant = self.input_parameters.get("tau-t", None)
if isinstance(coupling_constant, str):
coupling_constant = float(coupling_constant.split()[0])
coupling_constant = to_float(
coupling_constant.split()[0]
) # ! simulated annealing protocols not supported
coupling_constant *= ureg.picosecond if coupling_constant else None
thermostat_parameters["coupling_constant"] = coupling_constant

Expand Down Expand Up @@ -1185,7 +1186,9 @@ def get_barostat_parameters(self):
)
taup = self.input_parameters.get("tau-p", None)
barostat_parameters["coupling_constant"] = (
np.ones(shape=(3, 3)) * float(taup) * ureg.picosecond if taup else None
np.ones(shape=(3, 3)) * to_float(taup) * ureg.picosecond
if to_float(taup)
else None
)
refp = self.input_parameters.get("ref-p", None)
barostat_parameters["reference_pressure"] = (
Expand Down Expand Up @@ -1230,7 +1233,7 @@ def get_free_energy_calculation_parameters(self):
for key in lambda_key_map.keys()
}
lambdas = {
key: [float(i) for i in val.split()] for key, val in lambdas.items()
key: [to_float(i) for i in val.split()] for key, val in lambdas.items()
}
free_energy_parameters["lambdas"] = [
{"kind": nomad_key, "value": lambdas[gromacs_key]}
Expand Down Expand Up @@ -1327,7 +1330,9 @@ def parse_workflow(self):
1.0, ureg.kilojoule * ureg.avogadro_number / ureg.nanometer, ureg.newton
)
workflow.method.convergence_tolerance_force_maximum = (
float(force_maximum) * force_conversion if force_maximum else None
to_float(force_maximum) * force_conversion
if to_float(force_maximum)
else None
)

energies = []
Expand All @@ -1345,13 +1350,14 @@ def parse_workflow(self):

final_force_maximum = self.log_parser.get("maximum_force")
final_force_maximum = (
float(re.split("=|\n", final_force_maximum)[1])
re.split("=|\n", final_force_maximum)[1]
if final_force_maximum
else None
)
final_force_maximum = to_float(final_force_maximum)
workflow.results.final_force_maximum = (
float(final_force_maximum) * force_conversion
if final_force_maximum
to_float(final_force_maximum) * force_conversion
if to_float(final_force_maximum)
else None
)
self.archive.workflow2 = workflow
Expand Down Expand Up @@ -1390,7 +1396,7 @@ def parse_workflow(self):
method["integrator_type"] = value
timestep = input_parameters.get("dt", None)
method["integration_timestep"] = (
float(timestep) * ureg.picosecond if timestep else None
to_float(timestep) * ureg.picosecond if to_float(timestep) else None
)

thermostat_parameters = self.get_thermostat_parameters(integrator)
Expand All @@ -1414,26 +1420,44 @@ def parse_workflow(self):
free_energies = self.xvg_parser.get("results")

title = free_energies.get("title", "") if free_energies is not None else ""
flag_FE = False
if r"dH/d\xl\f{}" in title and r"\xD\f{}H" in title:
flag_FE = True
flag_fe = False
if (
r"dH/d\xl\f{}" in title and r"\xD\f{}H" in title
): # TODO incorporate x and y axis labels into the checks
flag_fe = True
results_key = "free_energy_calculations"
results[results_key] = {}
columns = free_energies.get("column_vals")
results[results_key]["n_frames"] = len(columns)
results[results_key]["n_states"] = len(
method[params_key].get("lambdas", [])
lambdas = method[params_key].get("lambdas", None)
results[results_key]["n_states"] = (
len(lambdas[0].get("value", [])) if lambdas is not None else None
)
results[results_key]["lambda_index"] = method[params_key].get(
"lambda_index", None
)
results[results_key]["times"] = columns[:, 0] * ureg.ps
columns = columns[:, 1:] * self._gro_energy_units.magnitude
results[results_key]["value_unit"] = str(self._gro_energy_units.units)
xaxis = free_energies.get("xaxis", "").lower()
# The expected columns of the xvg file are:
# Total Energy
# dH/dlambda current lambda
# Delta H between each lambda and current lambda (n_lambda columns)
# PV Energy
if (
"time" in xaxis
and columns[:, 3:-1].shape[1] == results[results_key]["n_states"]
):
results[results_key]["times"] = columns[:, 0] * ureg.ps
columns = columns[:, 1:] * self._gro_energy_units.magnitude
else:
self.logger.warning(
"Unexpected format of xvg file. Not storing free energy calculation results."
)
flag_fe = False

self.parse_md_workflow(dict(method=method, results=results))

if flag_FE:
if flag_fe:
filename = os.path.join(
os.path.dirname(self.filepath.split("/raw/")[-1]),
f"{os.path.basename(self.filepath)}.archive.hdf5",
Expand All @@ -1444,41 +1468,32 @@ def parse_workflow(self):
if self.archive.m_context:
with self.archive.m_context.raw_file(filename, "wb") as f:
pass
sec_FE_parameters = (
sec_fe_parameters = (
self.archive.workflow2.method.free_energy_calculation_parameters[0]
)
sec_FE = self.archive.workflow2.results.free_energy_calculations[0]
sec_FE.method_ref = sec_FE_parameters
sec_fe = self.archive.workflow2.results.free_energy_calculations[0]
sec_fe.method_ref = sec_fe_parameters
if self.archive.m_context:
with self.archive.m_context.raw_file(filename, "r+b") as f:
# The expected columns of the xvg file are:
# Total Energy
# dH/dlambda current lambda
# Delta H between each lambda and current lambda (n_lambda columns)
# PV Energy
if columns[:, 2:-1].shape[1] != sec_FE.n_states:
self.logger.warning(
"Unexpected format of xvg file. Not storing free energy calculation results."
)
sec_FE.value_total_energy_magnitude = to_hdf5(
sec_fe.value_total_energy_magnitude = to_hdf5(
columns[:, 0],
f,
f"{sec_FE.m_path()}/value_total_energy_magnitude",
f"{sec_fe.m_path()}/value_total_energy_magnitude",
)
sec_FE.value_total_energy_derivative_magnitude = to_hdf5(
sec_fe.value_total_energy_derivative_magnitude = to_hdf5(
columns[:, 1],
f,
f"{sec_FE.m_path()}/value_total_energy_derivative_magnitude",
f"{sec_fe.m_path()}/value_total_energy_derivative_magnitude",
)
sec_FE.value_total_energy_differences_magnitude = to_hdf5(
sec_fe.value_total_energy_differences_magnitude = to_hdf5(
columns[:, 2:-1],
f,
f"{sec_FE.m_path()}/value_total_energy_differences_magnitude",
f"{sec_fe.m_path()}/value_total_energy_differences_magnitude",
)
sec_FE.value_PV_energy_magnitude = to_hdf5(
sec_fe.value_PV_energy_magnitude = to_hdf5(
columns[:, -1],
f,
f"{sec_FE.m_path()}/value_PV_energy_magnitude",
f"{sec_fe.m_path()}/value_PV_energy_magnitude",
)

def parse_input(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_gromacsparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ def get_dataset(filename_with_path):
assert sec_method.final_state_bonded == "on"

assert sec_results.n_frames == 5001
assert sec_results.n_states == 7
assert sec_results.n_states == 11
assert sec_results.lambda_index == 7
assert len(sec_results.times) == 5001
assert sec_results.times.to("ps")[10].magnitude == approx(2.0)
Expand Down

0 comments on commit 6a426bd

Please sign in to comment.