Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser interface #98

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,4 @@ env/

# gromacs files created on run
.*.npz
.*.lock
138 changes: 74 additions & 64 deletions atomisticparsers/asap/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@
from nomad.units import ureg
from nomad.parsing.file_parser import FileParser
from runschema.run import Run, Program
from runschema.method import (
Method, ForceField, Model
)
from simulationworkflowschema import (
GeometryOptimization, GeometryOptimizationMethod
)
from runschema.method import Method, ForceField, Model
from simulationworkflowschema import GeometryOptimization, GeometryOptimizationMethod
from atomisticparsers.utils import MDParser
from .metainfo.asap import MolecularDynamics # pylint: disable=unused-import

Expand All @@ -43,21 +39,21 @@ def __init__(self):
def traj(self):
if self._file_handler is None:
try:
self._file_handler = Trajectory(self.mainfile, 'r')
self._file_handler = Trajectory(self.mainfile, "r")
# check if traj file is really asap
if 'calculator' in self._file_handler.backend.keys():
if self._file_handler.backend.calculator.name != 'emt': # pylint: disable=E1101
self.logger.error('Trajectory is not ASAP.')
if "calculator" in self._file_handler.backend.keys():
if self._file_handler.backend.calculator.name != "emt": # pylint: disable=E1101
self.logger.error("Trajectory is not ASAP.")
self._file_handler = None
except Exception:
self.logger.error('Error reading trajectory file.')
self.logger.error("Error reading trajectory file.")
return self._file_handler

def get_version(self):
if hasattr(self.traj, 'ase_version') and self.traj.ase_version:
if hasattr(self.traj, "ase_version") and self.traj.ase_version:
return self.traj.ase_version
else:
return '3.x.x'
return "3.x.x"

def parse(self):
pass
Expand All @@ -68,10 +64,6 @@ def __init__(self):
self.traj_parser = TrajParser()
super().__init__()

def init_parser(self):
self.traj_parser.mainfile = self.filepath
self.traj_parser.logger = self.logger

def parse_method(self):
traj = self.traj_parser.traj
sec_method = Method()
Expand All @@ -80,71 +72,72 @@ def parse_method(self):
if traj[0].calc is not None:
sec_method.force_field = ForceField(model=[Model(name=traj[0].calc.name)])

description = traj.description if hasattr(traj, 'description') else dict()
description = traj.description if hasattr(traj, "description") else dict()
if not description:
return

calc_type = description.get('type')
if calc_type == 'optimization':
calc_type = description.get("type")
if calc_type == "optimization":
workflow = GeometryOptimization(method=GeometryOptimizationMethod())
workflow.x_asap_maxstep = description.get('maxstep', 0)
workflow.method.method = description.get('optimizer', '').lower()
workflow.x_asap_maxstep = description.get("maxstep", 0)
workflow.method.method = description.get("optimizer", "").lower()
self.archive.workflow2 = workflow
elif calc_type == 'molecular-dynamics':
elif calc_type == "molecular-dynamics":
data = {}
data['x_asap_timestep'] = description.get('timestep', 0)
data['x_asap_temperature'] = description.get('temperature', 0)
md_type = description.get('md-type', '')
data["x_asap_timestep"] = description.get("timestep", 0)
data["x_asap_temperature"] = description.get("temperature", 0)
md_type = description.get("md-type", "")
thermodynamic_ensemble = None
if 'Langevin' in md_type:
data['x_asap_langevin_friction'] = description.get('friction', 0)
thermodynamic_ensemble = 'NVT'
elif 'NVT' in md_type:
thermodynamic_ensemble = 'NVT'
elif 'Verlet' in md_type:
thermodynamic_ensemble = 'NVE'
elif 'NPT' in md_type:
thermodynamic_ensemble = 'NPT'
data['method'] = {'thermodynamic_ensemble': thermodynamic_ensemble}
if "Langevin" in md_type:
data["x_asap_langevin_friction"] = description.get("friction", 0)
thermodynamic_ensemble = "NVT"
elif "NVT" in md_type:
thermodynamic_ensemble = "NVT"
elif "Verlet" in md_type:
thermodynamic_ensemble = "NVE"
elif "NPT" in md_type:
thermodynamic_ensemble = "NPT"
data["method"] = {"thermodynamic_ensemble": thermodynamic_ensemble}
self.parse_md_workflow(data)

def parse(self, filepath, archive, logger):
self.filepath = os.path.abspath(filepath)
self.archive = archive
self.maindir = os.path.dirname(self.filepath)
self.logger = logger if logger is not None else logging

self.init_parser()

def write_to_archive(self) -> None:
self.traj_parser.mainfile = self.mainfile
if self.traj_parser.traj is None:
return

sec_run = Run()
self.archive.run.append(sec_run)
sec_run. program = Program(name='ASAP', version=self.traj_parser.get_version())
sec_run.program = Program(name="ASAP", version=self.traj_parser.get_version())

# TODO do we build the topology and method for each frame
self.parse_method()

# set up md parser
self.n_atoms = [traj.get_global_number_of_atoms() for traj in self.traj_parser.traj]
steps = [(traj.description if hasattr(traj, 'description') else dict()).get(
'interval', 1) * n for n, traj in enumerate(self.traj_parser.traj)]
self.n_atoms = max(
[traj.get_global_number_of_atoms() for traj in self.traj_parser.traj]
)
steps = [
(traj.description if hasattr(traj, "description") else dict()).get(
"interval", 1
)
* n
for n, traj in enumerate(self.traj_parser.traj)
]
self.trajectory_steps = steps
self.thermodynamics_steps = steps

def get_constraint_name(constraint):
def index():
d = constraint['kwargs'].get('direction')
d = constraint["kwargs"].get("direction")
return ((d / np.linalg.norm(d)) ** 2).argsort()[2]

name = constraint.get('name')
if name == 'FixedPlane':
return ['fix_yz', 'fix_xz', 'fix_xy'][index()]
elif name == 'FixedLine':
return ['fix_x', 'fix_y', 'fix_z'][index()]
elif name == 'FixAtoms':
return 'fix_xyz'
name = constraint.get("name")
if name == "FixedPlane":
return ["fix_yz", "fix_xz", "fix_xy"][index()]
elif name == "FixedLine":
return ["fix_x", "fix_y", "fix_z"][index()]
elif name == "FixAtoms":
return "fix_xyz"
else:
return name

Expand All @@ -160,11 +153,25 @@ def index():
constraints = []
for constraint in traj.constraints:
as_dict = constraint.todict()
indices = as_dict['kwargs'].get('a', as_dict['kwargs'].get('indices'))
constraints.append(dict(atom_indices=np.asarray(indices), kind=get_constraint_name(as_dict)))
self.parse_trajectory_step(dict(atoms=dict(
lattice_vectors=lattice_vectors, labels=labels, positions=positions,
periodic=periodic, velocities=velocities), constraint=constraints))
indices = as_dict["kwargs"].get("a", as_dict["kwargs"].get("indices"))
constraints.append(
dict(
atom_indices=np.asarray(indices),
kind=get_constraint_name(as_dict),
)
)
self.parse_trajectory_step(
dict(
atoms=dict(
lattice_vectors=lattice_vectors,
labels=labels,
positions=positions,
periodic=periodic,
velocities=velocities,
),
constraint=constraints,
)
)

for step in self.thermodynamics_steps:
traj = self.traj_parser.traj[steps.index(step)]
Expand All @@ -174,6 +181,9 @@ def index():
forces = forces * ureg.eV / ureg.angstrom
if (forces_raw := traj.get_forces(apply_constraint=False)) is not None:
forces_raw * ureg.eV / ureg.angstrom
self.parse_thermodynamics_step(dict(
energy=dict(total=dict(value=total_energy)),
forces=dict(total=dict(value=forces, value_raw=forces_raw))))
self.parse_thermodynamics_step(
dict(
energy=dict(total=dict(value=total_energy)),
forces=dict(total=dict(value=forces, value_raw=forces_raw)),
)
)
Loading
Loading