generated from binder-examples/conda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MemNanoTestProcessor.py
78 lines (64 loc) · 2.44 KB
/
MemNanoTestProcessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from coffea import processor
import hist
import awkward as ak
import numpy as np
from coffea.nanoevents.methods import vector
from collections import defaultdict
import time
class MemNanoTestProcessor(processor.ProcessorABC):
def __init__(self, columns=[]):
self._columns = columns
self.expected_usermeta = {
"ZJets": ("someusermeta", "hello"),
"Data": ("someusermeta2", "world"),
}
@property
def columns(self):
return self._columns
@property
def accumulator(self):
dataset_axis = hist.axis.StrCategory(
[], growth=True, name="dataset", label="Primary dataset"
)
mass_axis = hist.axis.Regular(
30000, 0.25, 300, name="mass", label=r"$m_{\mu\mu}$ [GeV]"
)
pt_axis = hist.axis.Regular(30000, 0.24, 300, name="pt", label=r"$p_{T}$ [GeV]")
accumulator = {
# replace when py3.6 is dropped
# "mass": hist.Hist(dataset_axis, mass_axis, name="Counts"),
# "pt": hist.Hist(dataset_axis, pt_axis, name="Counts"),
"mass": hist.Hist(dataset_axis, mass_axis),
"pt": hist.Hist(dataset_axis, pt_axis),
"cutflow": defaultdict(int),
}
return accumulator
def process(self, df):
ak.behavior.update(vector.behavior)
output = self.accumulator
dataset = df.metadata["dataset"]
if "checkusermeta" in df.metadata:
metaname, metavalue = self.expected_usermeta[dataset]
assert metavalue == df.metadata[metaname]
muon = ak.zip(
{
"pt": df.Muon_pt,
"eta": df.Muon_eta,
"phi": df.Muon_phi,
"mass": df.Muon_mass,
},
with_name="PtEtaPhiMLorentzVector",
)
dimuon = ak.combinations(muon, 2)
dimuon = dimuon["0"] + dimuon["1"]
output["pt"].fill(dataset=dataset, pt=ak.flatten(muon.pt))
output["mass"].fill(dataset=dataset, mass=ak.flatten(dimuon.mass))
output["cutflow"]["%s_pt" % dataset] += np.sum(ak.num(muon))
output["cutflow"]["%s_mass" % dataset] += np.sum(ak.num(dimuon))
# artificially increase time and memory of this simple example to show
# resource allocation capabilities.
big_mem = ak.count(muon) * 1024 * "A"
time.sleep(5)
return output
def postprocess(self, accumulator):
return accumulator