Skip to content


842 Refactor cities flow
Browse files Browse the repository at this point in the history


This PR introduces a major change in the organization of the IC cities
structure. Details
here (
here (

Since this is a major change in the repository, we will be moving to
version 2.0.0 when this PR is merged.

[reviewer: martinperezmaneiro]

Latest commits gave the final shape to the refactor of the cities, the
code is clear and understandable. Great work, approved!
  • Loading branch information
martinperezmaneiro authored and carhc committed May 14, 2024
2 parents 47503ba + 7e32798 commit 706d338
Show file tree
Hide file tree
Showing 33 changed files with 1,360 additions and 909 deletions.
158 changes: 118 additions & 40 deletions invisible_cities/cities/
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
Beersheba, a city suspended from the heavens, inhabited only by idealists.
This city interpolates corrected hits and applies Lucy-Richardson deconvolution
to the interpolated signal.
The input is esmeralda output containing hits, kdst global information and mc info.
The city outputs :
- DECO deconvolved hits table
- MC info (if run number <=0)
- SUMMARY summary of per event information
Beersheba, a city suspended from the heavens, inhabited only by
This city applies a Lucy-Rihcardson (LR) algorithm to reconstruct the
electron cloud density.
It reads hDSTs produced by Sophronia and produces deconvolved
hits. The LR deconvolution finds the (discretized) charge distribution
that generates the input image based on the PSF of the system. Each
bin in the charge distribution is interpreted as a (deconvolved)
The workflow of Beersheba can be summarized as:
- Apply geometrical and lifetime corrections
- Apply a low charge threshold to the input hits. If this leaves
behind a slice with no hits, a fake hit (NN-hit) is temporarily
- Merge NN-hits: The NN-hits' energy is reassigned to the closest
- Apply a second charge threshold with a slightly different energy
- Drops isolated sensors (*)
- Normalizes charge within each S2 peak
- For each slice
- Interpolates the hits to obtain a continuous image (real_image)
- Generate a flat charge distribution as a seed
- Do the following until a maximum number of iterations is reached
or the difference between consecutive images is smaller than a
- Convolve the charge distribution with the PSF (new_image)
- Update the charge distribution according to the difference
between new_image and real_image
- Clean up the image by applying a cut on the fraction of energy
of the resulting charge-distribution hits

import numpy as np
Expand All @@ -20,18 +45,18 @@
from scipy.stats import multivariate_normal
from numpy import nan_to_num

from typing import Tuple
from typing import List
from typing import Union
from typing import Sequence
from typing import Optional
from typing import Tuple
from typing import List
from typing import Optional

from . components import city
from . components import collect
from . components import copy_mc_info
from . components import print_every
from . components import cdst_from_files
from . components import summary_writer
from . components import hits_corrector
from . components import hits_thresholder
from . components import hits_and_kdst_from_files
from . components import identity

from .. core.configure import EventRangeType
from .. core.configure import OneOrManyFiles
Expand All @@ -54,7 +79,9 @@
from .. io.run_and_event_io import run_and_event_writer
from .. io. dst_io import df_writer
from .. io. dst_io import load_dst
from .. io. kdst_io import kdst_from_df_writer
from .. io. hits_io import hits_writer
from .. io. event_filter_io import event_filter_writer
from .. io. kdst_io import kdst_from_df_writer

from .. types.symbols import HitEnergy
from .. types.symbols import InterpolationMethod
Expand All @@ -64,6 +91,34 @@
from .. core import system_of_units as units

# Temporary. The removal of the event model will fix this.
from collections import defaultdict
def hitc_to_df_(hitc):
columns = defaultdict(list)
for hit in hitc.hits:
columns["event" ].append(hitc.event)
columns["time" ].append(hitc.time)
columns["npeak" ].append(hit .npeak)
columns["Xpeak" ].append(hit .Xpeak)
columns["Ypeak" ].append(hit .Ypeak)
columns["nsipm" ].append(hit .nsipm)
columns["X" ].append(hit .X)
columns["Y" ].append(hit .Y)
columns["Xrms" ].append(hit .Xrms)
columns["Yrms" ].append(hit .Yrms)
columns["Z" ].append(hit .Z)
columns["Q" ].append(hit .Q)
columns["E" ].append(hit .E)
columns["Qc" ].append(hit .Qc)
columns["Ec" ].append(hit .Ec)
columns["track_id"].append(hit .track_id)
columns["Ep" ].append(hit .Ep)
return pd.DataFrame(columns)

def event_info_adder(timestamp : float, dst : pd.DataFrame):
return dst.assign(time=timestamp/1e3, nsipm=0, Xrms=0, Yrms=0)

def deconvolve_signal(det_db : pd.DataFrame,
psf_fname : str,
Expand Down Expand Up @@ -178,6 +233,7 @@ def apply_deconvolution(df):
for peak, hits in df.groupby("npeak"):
hits.loc[:, "NormQ"] = hits.loc[:, 'Q'] / hits.loc[:, 'Q'].sum()
deconvolved_hits = pd.concat([deconvolve_hits(df_z, z) for z, df_z in hits.groupby("Z")], ignore_index=True)
deconvolved_hits = deconvolved_hits.assign(npeak=peak, Xpeak=hits.Xpeak.iloc[0], Ypeak=hits.Ypeak.iloc[0])
distribute_energy(deconvolved_hits, hits, energy_type)

Expand Down Expand Up @@ -310,14 +366,18 @@ def write_deconv(df):

def beersheba( files_in : OneOrManyFiles
, file_out : str
, compression : str
, event_range : EventRangeType
, print_mod : int
, detector_db : str
, run_number : int
, deconv_params : dict
def beersheba( files_in : OneOrManyFiles
, file_out : str
, compression : str
, event_range : EventRangeType
, print_mod : int
, detector_db : str
, run_number : int
, threshold : float
, same_peak : bool
, deconv_params : dict
, corrections_file : str
, apply_temp : bool
The city corrects Penthesilea hits energy and extracts topology information.
Expand All @@ -337,6 +397,10 @@ def beersheba( files_in : OneOrManyFiles
run_number : int
Has to be negative for MC runs
threshold : float
Threshold to be applied to all SiPMs.
same_peak : bool
Whether to reassign NN hits within the same peak.
deconv_params : dict
q_cut : float
Minimum charge (pes) on a hit (SiPM)
Expand Down Expand Up @@ -385,8 +449,14 @@ def beersheba( files_in : OneOrManyFiles
DECO : Deconvolved hits table
MC info : (if run number <=0)
SUMMARY : Table with the summary from Esmeralda.

if corrections_file is None: correct_hits = identity
else : correct_hits = hits_corrector(corrections_file, apply_temp)
correct_hits = correct_hits, item="hits")

threshold_hits =, same_peak), item="hits")
hitc_to_df =, item="hits")

deconv_params['psf_fname' ] = expandvars(deconv_params['psf_fname'])

Expand All @@ -397,45 +467,53 @@ def beersheba( files_in : OneOrManyFiles
raise NotImplementedError(f"{deconv_params['n_dim']}-dimensional PSF not yet implemented")

cut_sensors = (deconv_params.pop("q_cut") , ['E', 'Ec']),
item = 'cdst')
item = 'hits')
drop_sensors ="drop_dist"), ['E', 'Ec']),
item = 'cdst')
item = 'hits')
filter_events_no_hits =,
args = 'cdst',
out = 'cdst_passed_no_hits')
args = 'hits',
out = 'hits_passed_no_hits')
deconvolve_events =, run_number), **deconv_params),
args = 'cdst',
args = 'hits',
out = 'deconv_dst')

add_event_info =, args=("timestamp", "deconv_dst"), out="deconv_dst")

event_count_in = fl.spy_count()
event_count_out = fl.spy_count()
events_passed_no_hits = fl.count_filter(bool, args = "cdst_passed_no_hits")
events_passed_no_hits = fl.count_filter(bool, args = "hits_passed_no_hits")

filter_out_none = fl.filter(lambda x: x is not None, args = "kdst")

evtnum_collect = collect()

with tb.open_file(file_out, "w", filters = tbl.filters(compression)) as h5out:
# Define writers
write_event_info = fl.sink(run_and_event_writer (h5out), args = ("run_number", "event_number", "timestamp"))
write_deconv = fl.sink( deconv_writer(h5out=h5out), args = "deconv_dst")
write_summary = fl.sink( summary_writer(h5out=h5out), args = "summary" )
write_kdst_table = fl.sink( kdst_from_df_writer(h5out), args = "kdst" )
write_event_info = fl.sink(run_and_event_writer(h5out), args = ("run_number", "event_number", "timestamp"))
write_deconv = fl.sink( deconv_writer(h5out), args = "deconv_dst")
write_kdst_table = fl.sink( kdst_from_df_writer(h5out), args = "kdst" )
write_thr_hits = fl.sink( hits_writer(h5out, "CHITS", "lowTh"), args = "hits")
write_nohits_filter = fl.sink( event_filter_writer(h5out, "nohits"), args=("event_number", "hits_passed_no_hits"))

result = push(source = cdst_from_files(files_in),
result = push(source = hits_and_kdst_from_files(files_in, "RECO", "Events"),
pipe = pipe(fl.slice(*event_range, close_all=True) ,
print_every(print_mod) ,
event_count_in.spy ,
correct_hits ,
threshold_hits ,
fl.branch(write_thr_hits) ,
hitc_to_df ,
cut_sensors ,
drop_sensors ,
filter_events_no_hits ,
events_passed_no_hits .filter ,
fl.branch(write_nohits_filter) ,
events_passed_no_hits.filter ,
deconvolve_events ,
add_event_info ,
event_count_out.spy ,
fl.branch("event_number" ,
evtnum_collect.sink) ,
fl.fork(write_deconv ,
write_summary ,
(filter_out_none, write_kdst_table),
write_event_info)) ,
result = dict(events_in = event_count_in .future,
Expand Down

0 comments on commit 706d338

Please sign in to comment.