Skip to content

Commit

Permalink
Implement general parser class
Browse files Browse the repository at this point in the history
  • Loading branch information
ladinesa committed Mar 10, 2024
1 parent cba342e commit b6c8405
Show file tree
Hide file tree
Showing 9 changed files with 3,582 additions and 2,222 deletions.
138 changes: 74 additions & 64 deletions atomisticparsers/asap/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@
from nomad.units import ureg
from nomad.parsing.file_parser import FileParser
from runschema.run import Run, Program
from runschema.method import (
Method, ForceField, Model
)
from simulationworkflowschema import (
GeometryOptimization, GeometryOptimizationMethod
)
from runschema.method import Method, ForceField, Model
from simulationworkflowschema import GeometryOptimization, GeometryOptimizationMethod
from atomisticparsers.utils import MDParser
from .metainfo.asap import MolecularDynamics # pylint: disable=unused-import

Expand All @@ -43,21 +39,21 @@ def __init__(self):
def traj(self):
if self._file_handler is None:
try:
self._file_handler = Trajectory(self.mainfile, 'r')
self._file_handler = Trajectory(self.mainfile, "r")
# check if traj file is really asap
if 'calculator' in self._file_handler.backend.keys():
if self._file_handler.backend.calculator.name != 'emt': # pylint: disable=E1101
self.logger.error('Trajectory is not ASAP.')
if "calculator" in self._file_handler.backend.keys():
if self._file_handler.backend.calculator.name != "emt": # pylint: disable=E1101
self.logger.error("Trajectory is not ASAP.")
self._file_handler = None
except Exception:
self.logger.error('Error reading trajectory file.')
self.logger.error("Error reading trajectory file.")
return self._file_handler

def get_version(self):
if hasattr(self.traj, 'ase_version') and self.traj.ase_version:
if hasattr(self.traj, "ase_version") and self.traj.ase_version:
return self.traj.ase_version
else:
return '3.x.x'
return "3.x.x"

def parse(self):
pass
Expand All @@ -68,10 +64,6 @@ def __init__(self):
self.traj_parser = TrajParser()
super().__init__()

def init_parser(self):
self.traj_parser.mainfile = self.filepath
self.traj_parser.logger = self.logger

def parse_method(self):
traj = self.traj_parser.traj
sec_method = Method()
Expand All @@ -80,71 +72,72 @@ def parse_method(self):
if traj[0].calc is not None:
sec_method.force_field = ForceField(model=[Model(name=traj[0].calc.name)])

description = traj.description if hasattr(traj, 'description') else dict()
description = traj.description if hasattr(traj, "description") else dict()
if not description:
return

calc_type = description.get('type')
if calc_type == 'optimization':
calc_type = description.get("type")
if calc_type == "optimization":
workflow = GeometryOptimization(method=GeometryOptimizationMethod())
workflow.x_asap_maxstep = description.get('maxstep', 0)
workflow.method.method = description.get('optimizer', '').lower()
workflow.x_asap_maxstep = description.get("maxstep", 0)
workflow.method.method = description.get("optimizer", "").lower()
self.archive.workflow2 = workflow
elif calc_type == 'molecular-dynamics':
elif calc_type == "molecular-dynamics":
data = {}
data['x_asap_timestep'] = description.get('timestep', 0)
data['x_asap_temperature'] = description.get('temperature', 0)
md_type = description.get('md-type', '')
data["x_asap_timestep"] = description.get("timestep", 0)
data["x_asap_temperature"] = description.get("temperature", 0)
md_type = description.get("md-type", "")
thermodynamic_ensemble = None
if 'Langevin' in md_type:
data['x_asap_langevin_friction'] = description.get('friction', 0)
thermodynamic_ensemble = 'NVT'
elif 'NVT' in md_type:
thermodynamic_ensemble = 'NVT'
elif 'Verlet' in md_type:
thermodynamic_ensemble = 'NVE'
elif 'NPT' in md_type:
thermodynamic_ensemble = 'NPT'
data['method'] = {'thermodynamic_ensemble': thermodynamic_ensemble}
if "Langevin" in md_type:
data["x_asap_langevin_friction"] = description.get("friction", 0)
thermodynamic_ensemble = "NVT"
elif "NVT" in md_type:
thermodynamic_ensemble = "NVT"
elif "Verlet" in md_type:
thermodynamic_ensemble = "NVE"
elif "NPT" in md_type:
thermodynamic_ensemble = "NPT"
data["method"] = {"thermodynamic_ensemble": thermodynamic_ensemble}
self.parse_md_workflow(data)

def parse(self, filepath, archive, logger):
self.filepath = os.path.abspath(filepath)
self.archive = archive
self.maindir = os.path.dirname(self.filepath)
self.logger = logger if logger is not None else logging

self.init_parser()

def write_to_archive(self) -> None:
self.traj_parser.mainfile = self.mainfile
if self.traj_parser.traj is None:
return

sec_run = Run()
self.archive.run.append(sec_run)
sec_run. program = Program(name='ASAP', version=self.traj_parser.get_version())
sec_run.program = Program(name="ASAP", version=self.traj_parser.get_version())

# TODO do we build the topology and method for each frame
self.parse_method()

# set up md parser
self.n_atoms = [traj.get_global_number_of_atoms() for traj in self.traj_parser.traj]
steps = [(traj.description if hasattr(traj, 'description') else dict()).get(
'interval', 1) * n for n, traj in enumerate(self.traj_parser.traj)]
self.n_atoms = [
traj.get_global_number_of_atoms() for traj in self.traj_parser.traj
]
steps = [
(traj.description if hasattr(traj, "description") else dict()).get(
"interval", 1
)
* n
for n, traj in enumerate(self.traj_parser.traj)
]
self.trajectory_steps = steps
self.thermodynamics_steps = steps

def get_constraint_name(constraint):
def index():
d = constraint['kwargs'].get('direction')
d = constraint["kwargs"].get("direction")
return ((d / np.linalg.norm(d)) ** 2).argsort()[2]

name = constraint.get('name')
if name == 'FixedPlane':
return ['fix_yz', 'fix_xz', 'fix_xy'][index()]
elif name == 'FixedLine':
return ['fix_x', 'fix_y', 'fix_z'][index()]
elif name == 'FixAtoms':
return 'fix_xyz'
name = constraint.get("name")
if name == "FixedPlane":
return ["fix_yz", "fix_xz", "fix_xy"][index()]
elif name == "FixedLine":
return ["fix_x", "fix_y", "fix_z"][index()]
elif name == "FixAtoms":
return "fix_xyz"
else:
return name

Expand All @@ -160,11 +153,25 @@ def index():
constraints = []
for constraint in traj.constraints:
as_dict = constraint.todict()
indices = as_dict['kwargs'].get('a', as_dict['kwargs'].get('indices'))
constraints.append(dict(atom_indices=np.asarray(indices), kind=get_constraint_name(as_dict)))
self.parse_trajectory_step(dict(atoms=dict(
lattice_vectors=lattice_vectors, labels=labels, positions=positions,
periodic=periodic, velocities=velocities), constraint=constraints))
indices = as_dict["kwargs"].get("a", as_dict["kwargs"].get("indices"))
constraints.append(
dict(
atom_indices=np.asarray(indices),
kind=get_constraint_name(as_dict),
)
)
self.parse_trajectory_step(
dict(
atoms=dict(
lattice_vectors=lattice_vectors,
labels=labels,
positions=positions,
periodic=periodic,
velocities=velocities,
),
constraint=constraints,
)
)

for step in self.thermodynamics_steps:
traj = self.traj_parser.traj[steps.index(step)]
Expand All @@ -174,6 +181,9 @@ def index():
forces = forces * ureg.eV / ureg.angstrom
if (forces_raw := traj.get_forces(apply_constraint=False)) is not None:
forces_raw * ureg.eV / ureg.angstrom
self.parse_thermodynamics_step(dict(
energy=dict(total=dict(value=total_energy)),
forces=dict(total=dict(value=forces, value_raw=forces_raw))))
self.parse_thermodynamics_step(
dict(
energy=dict(total=dict(value=total_energy)),
forces=dict(total=dict(value=forces, value_raw=forces_raw)),
)
)
Loading

0 comments on commit b6c8405

Please sign in to comment.