Skip to content

Commit

Permalink
Merge pull request #88 from mtremmel/AHF_handler_fix_mjt
Browse files Browse the repository at this point in the history
Fix for non-contiguous AHF halo numbering especially with MPI runs
  • Loading branch information
apontzen authored Mar 20, 2021
2 parents 926f928 + aacf2ee commit 38e73a2
Show file tree
Hide file tree
Showing 24 changed files with 183 additions and 128 deletions.
25 changes: 16 additions & 9 deletions tangos/core/halo.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
class Halo(Base):
__tablename__= "halos"

id = Column(Integer, primary_key=True)
halo_number = Column(Integer)
finder_id = Column(Integer)
id = Column(Integer, primary_key=True) #the unique ID value of the database object created for this halo
halo_number = Column(Integer) #by default this will be the halo's rank in terms of particle count
finder_id = Column(Integer) #raw halo ID from the halo catalog
finder_offset = Column(Integer) #index of halo within halo catalog, primary identifier used when reading catalog/simulation data
timestep_id = Column(Integer, ForeignKey('timesteps.id'))
timestep = relationship(TimeStep, backref=backref(
'objects', order_by=halo_number, cascade='all', lazy='dynamic'), cascade='save-update, merge')
Expand Down Expand Up @@ -72,10 +73,11 @@ def object_typetag_from_code(typecode):
return c.tag
raise ValueError("Unknown object typecode %d",typecode)

def __init__(self, timestep, halo_number, finder_id, NDM, NStar, NGas, object_typecode=0):
def __init__(self, timestep, halo_number, finder_id, finder_offset, NDM, NStar, NGas, object_typecode=0):
self.timestep = timestep
self.halo_number = int(halo_number)
self.finder_id = int(finder_id)
self.finder_offset = int(finder_offset)
self.NDM = int(NDM)
self.NStar = int(NStar)
self.NGas = int(NGas)
Expand Down Expand Up @@ -120,12 +122,17 @@ def load(self, mode=None):
handler this can be None or 'partial' (in a normal session) and, when running inside an MPI session,
'server' or 'server-partial'. See https://pynbody.github.io/tangos/mpi.html.
"""
if self.finder_id is None:
halo_number = self.halo_number
if not hasattr(self, "finder_id"):
finder_id = self.halo_number # backward compatibility
else:
finder_id = self.finder_id
finder_id = self.finder_id
if not hasattr(self, "finder_offset"):
finder_offset = finder_id
else:
finder_offset = self.finder_offset

return self.handler.load_object(self.timestep.extension, finder_id, object_typetag=self.tag, mode=mode)
return self.handler.load_object(self.timestep.extension, finder_id, finder_offset, object_typetag=self.tag, mode=mode)

def calculate(self, calculation, return_description=False):
"""Use the live-calculation system to calculate a user-specified function of the stored data.
Expand Down Expand Up @@ -390,7 +397,7 @@ class Tracker(Halo):
tag = "tracker"

def __init__(self, timestep, halo_number):
super(Tracker, self).__init__(timestep, halo_number, halo_number, 0,0,0,
super(Tracker, self).__init__(timestep, halo_number, halo_number, halo_number, 0,0,0,
self.__mapper_args__['polymorphic_identity'])

@property
Expand Down Expand Up @@ -449,7 +456,7 @@ class PhantomHalo(Halo):
tag = "phantom"

def __init__(self, timestep, halo_number, finder_id):
super(PhantomHalo, self).__init__(timestep, halo_number, finder_id, 0,0,0,
super(PhantomHalo, self).__init__(timestep, halo_number, finder_id, finder_id, 0,0,0,
self.__mapper_args__['polymorphic_identity'])

TimeStep.phantoms = orm.relationship(Halo, cascade='all', lazy='dynamic',
Expand Down
6 changes: 4 additions & 2 deletions tangos/input_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,14 @@ def load_timestep_without_caching(self, ts_extension, mode=None):
"""Creates and returns an object that connects to the data for a timestep on disk"""
raise NotImplementedError

def load_object(self, ts_extension, halo_number, object_typetag='halo', mode=None):
def load_object(self, ts_extension, finder_id, catalog_pos, object_typetag='halo', mode=None):
"""Creates and returns an object that connects to the data for a halo on disk.
:arg ts_extension - the timestep path (relative to the simulation basename)
:arg halo_number - the halo number in the raw halo finder output
:arg finder_id - the halo id in the raw halo finder output
:arg catalog_pos - the position of the raw halo finder output
:arg object_typetag - the type of halo catalogue (normally 'halo' or, for Subfind, might be 'group' to get
the top-level groups)
Expand Down
6 changes: 3 additions & 3 deletions tangos/input_handlers/ahf_trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ def _load_mtree_file_standard(self):
while i < len(data):
for j in range(data[i][2]):
idx = i+1+j
if data[idx][1]+1 in self._fid: # check if this halo was loaded in case a minimum number of particles different to AHF was used to load halos into DB
if data[idx][1] in self._fid: # check if this halo was loaded in case a minimum number of particles different to AHF was used to load halos into DB
# keep in mind finder id and AHF id have an offset of 1
results['id_desc'] = np.append(results['id_desc'],data[i][0]+1)
results['id_this'] = np.append(results['id_this'],data[idx][1]+1)
results['id_desc'] = np.append(results['id_desc'],data[i][0])
results['id_this'] = np.append(results['id_this'],data[idx][1])
results['f_share'] = np.append(results['f_share'], float(data[idx][0] * data[idx][0]) / (data[i][1] * data[idx][2]) )
i += data[i][2] + 1

Expand Down
70 changes: 43 additions & 27 deletions tangos/input_handlers/halo_stat_files/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

class HaloStatFile(object):
"""Manages and reads a halo stat file of unspecified format."""
_id_offset = 0
_finder_offset_start = 0 #whether finder_offset should start at 0 (default) or N (typically either 0 or 1)
_column_translations = {}

def __new__(cls, timestep):
Expand Down Expand Up @@ -55,33 +55,42 @@ def all_columns(self):

def iter_rows_raw(self, *args):
"""
Yield the halo ID and requested columns from each line of the stat file, without any emulation.
Yield
1) the index in which each halo appears in the catalog (starting from 0 unless _finder_offset_start is set)
2) the raw halo ID (finder_id) included in the halo stat file without any emulation
3) the rest of the requested parameters
:param args: strings for the column names
:return: id, arg1, arg2, arg3 where ID is the halo ID and argN is the value of the Nth named column
:return: finder_offset_start, finder_id, arg1, arg2, arg3, ... where finder_offset_start is the index of the halo within
the stat file, finder_id is the raw halo ID read from the stat file, and argN is the value associated with the
Nth column name provided as input.
"""
with open(self.filename) as f:
header = self._read_column_names(f)
cnt = 0
ids = [0]
for a in args:
try:
ids.append(header.index(a))
except ValueError:
ids.append(None)

for l in f:
if not l.startswith("#"):
yield self._get_values_for_columns(ids, l)
col_data = self._get_values_for_columns(ids, l)
col_data.insert(0, cnt+self._finder_offset_start)
yield col_data
cnt += 1

def iter_rows(self, *args):
"""
Yield the halo ID and requested columns from each line of the stat file, emulating the existence of some columns.
Yield the requested column values from the halo catalog stat file, as well as the finder_offset (index associated
with halo's position within the catalog) and finder_id (raw halo id listed in the stat file).
For example, AHF stat files do not contain n_dm; however, its value can be automatically inferred by this
function. Meanwhile IDL .amiga.stat files rename n_gas as N_gas.
Returned halo properties are emulated when necessary. For example, AHF stat files do not contain n_dm; however,
its value can be automatically inferred by this function. Meanwhile IDL .amiga.stat files rename n_gas as N_gas.
:param args: strings for the column names
:return: id, arg1, arg2, arg3 where ID is the halo ID and argN is the value of the Nth named column
:return: finder_offset, finder_id, arg1, arg2, arg3 where argN is the value of the Nth named column
"""

raw_args = []
Expand All @@ -90,19 +99,18 @@ def iter_rows(self, *args):
raw_args+=self._column_translations[arg].inputs()
else:
raw_args.append(arg)

for raw_values in self.iter_rows_raw(*raw_args):
values = [raw_values[0]]
values = [raw_values[0], raw_values[1]]
for arg in args:
if arg in self._column_translations:
values.append(self._column_translations[arg](raw_args, raw_values[1:]))
values.append(self._column_translations[arg](raw_args, raw_values[2:]))
else:
values.append(raw_values[1:][raw_args.index(arg)])
values.append(raw_values[2:][raw_args.index(arg)])
yield values

def read(self, *args):
"""Read the halo ID and requested columns from the entire file, returning each column as a separate array"""
return_values = [[] for _ in range(len(args)+1)]
return_values = [[] for _ in range(len(args)+2)]
for row in self.iter_rows(*args):
for return_array, value in zip(return_values, row):
return_array.append(value)
Expand All @@ -128,7 +136,6 @@ def _get_values_for_columns(self, columns, line):
this_cast = this_str

results.append(this_cast)
results[0] += self._id_offset
return results

def _read_column_names(self, f):
Expand All @@ -138,14 +145,14 @@ def _read_column_names(self, f):


class AHFStatFile(HaloStatFile):
_id_offset = 1
_finder_offset_start = 1

_column_translations = {'n_gas': translations.DefaultValue('n_gas', 0),
'n_star': translations.DefaultValue('n_star', 0),
'n_dm': translations.Function(lambda ngas, nstar, npart: npart - (ngas or 0) - (nstar or 0),
'n_gas', 'n_star', 'npart'),
'hostHalo': translations.Function(
lambda id: None if id==-1 else proxy_object.IncompleteProxyObjectFromFinderId(id+AHFStatFile._id_offset, 'halo'),
lambda id: None if id==-1 else proxy_object.IncompleteProxyObjectFromFinderId(id, 'halo'),
'hostHalo')}

def __init__(self, timestep_filename):
Expand All @@ -169,26 +176,23 @@ def filename(cls, timestep_filename):
return "CannotFindAHFHaloFilename"
else:
return file_list[0]
return file

def _calculate_children(self):
# use hostHalo column to calculate virtual childHalo entries
self._children_map = {}
for h_id, host_id_raw in self.iter_rows_raw("hostHalo"):
if host_id_raw!=-1:
host_id = host_id_raw + self._id_offset
cmap = self._children_map.get(host_id, [])
cmap.append(proxy_object.IncompleteProxyObjectFromFinderId(h_id,'halo'))
self._children_map[host_id] = cmap
for c_id, f_id, host_f_id in self.iter_rows_raw("hostHalo"):
if host_f_id!=-1:
cmap = self._children_map.get(host_f_id, [])
cmap.append(proxy_object.IncompleteProxyObjectFromFinderId(f_id,'halo'))
self._children_map[host_f_id] = cmap

def _calculate_children_if_required(self):
if not hasattr(self, "_children_map"):
self._calculate_children()

def _child_halo_entry(self, this_id_raw):
self._calculate_children_if_required()
this_id = this_id_raw + self._id_offset
children = self._children_map.get(this_id, [])
children = self._children_map.get(this_id_raw, [])
return children

class RockstarStatFile(HaloStatFile):
Expand All @@ -208,7 +212,6 @@ def filename(cls, timestep_filename):
return "CannotComputeRockstarFilename"

class AmigaIDLStatFile(HaloStatFile):
_id_offset = 0

_column_translations = {'n_dm': translations.Rename('N_dark'),
'n_gas': translations.Rename('N_gas'),
Expand All @@ -220,4 +223,17 @@ class AmigaIDLStatFile(HaloStatFile):
def filename(cls, timestep_filename):
return timestep_filename + '.amiga.stat'

def iter_rows_raw(self, *args):
"""
Yield the halo ID along with the values associated with each of the given arguments. The halo ID is output twice
in order to be consistent with other stat file readers. In this case, the finder_offset that is normally output
is just equal to the finder_id.
:param args: strings for the column names
:return: finder_id, finder_id, arg1, arg2, arg3, ... where finder_id is the halo's ID number read directly
from the stat file and argN is the value associated with the Nth column name given as arguments.
"""

for row in super().iter_rows_raw(*args):
row[0] = row[1] # sequential catalog index not right in this case; overwrite to match finder id
yield row
10 changes: 5 additions & 5 deletions tangos/input_handlers/output_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def enumerate_objects(self, ts_extension, object_typetag='halo', min_halo_partic
nhalos_string = self._get_ts_property(ts_extension, object_typetag+"s")
nhalos = 0 if nhalos_string is None else int(nhalos_string)
for i in range(nhalos):
yield i+1, 2000-i, 0, 0
yield i+1, i+1, 2000-i, 0, 0

def match_objects(self, ts1, ts2, halo_min, halo_max, dm_only=False, threshold=0.005,
object_typetag='halo', output_handler_for_ts2=None):
Expand Down Expand Up @@ -71,12 +71,12 @@ def load_region(self, ts_extension, region_specification, mode=None):
data.message = data.message[region_specification]
return data

def load_object(self, ts_extension, halo_number, object_typetag='halo', mode=None):
def load_object(self, ts_extension, finder_id, finder_offset, object_typetag='halo', mode=None):
assert object_typetag=='halo'
return DummyTimestepData("Test string - this would contain the data for %s halo %d"%(ts_extension ,halo_number),
return DummyTimestepData("Test string - this would contain the data for %s halo %d"%(ts_extension ,finder_id),
float(self._get_ts_property(ts_extension, 'time')),
int(self._get_ts_property(ts_extension, 'halos')),
halo_number)
finder_offset)

def _get_ts_property(self, ts_extension, property):
ts_filename = self._extension_to_filename(ts_extension)
Expand All @@ -92,4 +92,4 @@ def enumerate_objects(self, ts_extension, object_typetag='halo', min_halo_partic
nhalos_string = self._get_ts_property(ts_extension, object_typetag+"s")
nhalos = 0 if nhalos_string is None else int(nhalos_string)
for i in range(nhalos):
yield i+1, 2000+i, 0, 0
yield i+1, i+1, 2000+i, 0, 0
20 changes: 10 additions & 10 deletions tangos/input_handlers/pynbody.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ def load_region(self, ts_extension, region_specification, mode=None):
else:
raise NotImplementedError("Load mode %r is not implemented"%mode)

def load_object(self, ts_extension, halo_number, object_typetag='halo', mode=None):
def load_object(self, ts_extension, finder_id, finder_offset, object_typetag='halo', mode=None):
if mode=='partial':
h = self._construct_halo_cat(ts_extension, object_typetag)
h_file = h.load_copy(halo_number)
h_file = h.load_copy(finder_offset)
h_file.physical_units()
return h_file
elif mode=='server':
timestep = self.load_timestep(ts_extension, mode)
from ..parallel_tasks import pynbody_server as ps
return timestep.get_view(ps.ObjectSpecification(halo_number, object_typetag))
return timestep.get_view(ps.ObjectSpecification(finder_id, finder_offset, object_typetag))
elif mode=='server-partial':
timestep = self.load_timestep(ts_extension, mode)
from ..parallel_tasks import pynbody_server as ps
view = timestep.get_view(ps.ObjectSpecification(halo_number, object_typetag))
view = timestep.get_view(ps.ObjectSpecification(finder_id, finder_offset, object_typetag))
load_index = view['remote-index-list']
logger.info("Partial load %r, taking %d particles", ts_extension, len(load_index))
f = pynbody.load(self._extension_to_filename(ts_extension), take=load_index)
f.physical_units()
return f
elif mode is None:
h = self._construct_halo_cat(ts_extension, object_typetag)
return h[halo_number]
return h[finder_offset]
else:
raise NotImplementedError("Load mode %r is not implemented"%mode)

Expand Down Expand Up @@ -236,8 +236,8 @@ def enumerate_objects(self, ts_extension, object_typetag="halo", min_halo_partic
for i in range(istart, len(h)+istart):
try:
hi = h[i]
if len(hi.dm) > min_halo_particles:
yield i, len(hi.dm), len(hi.star), len(hi.gas)
if len(hi.dm)+len(hi.star)+len(hi.gas) > min_halo_particles:
yield i, i, len(hi.dm), len(hi.star), len(hi.gas)
except (ValueError, KeyError) as e:
pass

Expand Down Expand Up @@ -335,12 +335,12 @@ def _is_able_to_load(self, filepath):
except (IOError, RuntimeError):
return False

def load_object(self, ts_extension, halo_number, object_typetag='halo', mode=None):
def load_object(self, ts_extension, finder_id, finder_offset, object_typetag='halo', mode=None):
if mode=='subfind_properties':
h = self._construct_halo_cat(ts_extension, object_typetag)
return h.get_halo_properties(halo_number,with_unit=False)
return h.get_halo_properties(finder_offset,with_unit=False)
else:
return super(GadgetSubfindInputHandler, self).load_object(ts_extension, halo_number, object_typetag, mode)
return super(GadgetSubfindInputHandler, self).load_object(ts_extension, finder_id, finder_offset, object_typetag, mode)

def _construct_group_cat(self, ts_extension):
f = self.load_timestep(ts_extension)
Expand Down
Loading

0 comments on commit 38e73a2

Please sign in to comment.