diff --git a/src/coffea/lumi_tools/lumi_tools.py b/src/coffea/lumi_tools/lumi_tools.py index e55230af0..8aef1bda0 100644 --- a/src/coffea/lumi_tools/lumi_tools.py +++ b/src/coffea/lumi_tools/lumi_tools.py @@ -1,12 +1,29 @@ import json - -import awkward as ak -import dask_awkward as dak +from functools import partial + +import awkward +import dask.delayed +import dask_awkward +import numba +import numpy +from dask.base import tokenize +from dask.highlevelgraph import HighLevelGraph +from dask_awkward.layers import AwkwardTreeReductionLayer +from dask_awkward.lib.core import new_array_object from numba import types from numba.typed import Dict -from ..util import numba -from ..util import numpy as np + +def wrap_get_lumi(runlumis, lumi_index): + runlumis_or_lz = awkward.typetracer.length_zero_if_typetracer(runlumis).to_numpy() + wrap_tot_lumi = numpy.zeros((1,)) + LumiData._get_lumi_kernel( + runlumis_or_lz[:, 0], runlumis_or_lz[:, 1], lumi_index, wrap_tot_lumi + ) + out = awkward.Array(wrap_tot_lumi) + if awkward.backend(runlumis) == "typetracer": + out = awkward.Array(out.layout.to_typetracer(forget_length=True)) + return out class LumiData: @@ -28,11 +45,12 @@ class LumiData: If you are using a LumiData file containing avg. inst. luminosity, make sure to set is_inst_lumi=True in the constructor of this class. """ + # 2^18 orbits / 40 MHz machine clock / 3564 bunch positions seconds_per_lumi_LHC = 2**18 / (40079000 / 3564) def __init__(self, lumi_csv, is_inst_lumi=False): self._is_inst_lumi = is_inst_lumi - self._lumidata = np.loadtxt( + self._lumidata = numpy.loadtxt( lumi_csv, delimiter=",", usecols=(0, 1, 6, 7), @@ -43,6 +61,8 @@ def __init__(self, lumi_csv, is_inst_lumi=False): ], # not sure what lumi:0 means, appears to be always zero (DAQ off before beam dump?) }, ) + self.index = None + self.index_delayed = None def get_lumi(self, runlumis): """Calculate integrated lumi @@ -53,19 +73,40 @@ def get_lumi(self, runlumis): A 2d numpy array of ``[[run,lumi], [run,lumi], ...]`` or `LumiList` object of the lumiSections to integrate over. """ - self.index = Dict.empty( - key_type=types.Tuple([types.uint32, types.uint32]), value_type=types.float64 - ) - runs = self._lumidata[:, 0].astype("u4") - lumis = self._lumidata[:, 1].astype("u4") - # fill self.index - LumiData._build_lumi_table_kernel(runs, lumis, self._lumidata, self.index) + if self.index is None: + self.index = Dict.empty( + key_type=types.Tuple([types.uint32, types.uint32]), + value_type=types.float64, + ) + runs = self._lumidata[:, 0].astype("u4") + lumis = self._lumidata[:, 1].astype("u4") + # fill self.index + LumiData._build_lumi_table_kernel(runs, lumis, self._lumidata, self.index) + # delayed object cache + self.index_delayed = dask.delayed(self.index) if isinstance(runlumis, LumiList): runlumis = runlumis.array - tot_lumi = np.zeros((1,), dtype=np.dtype("float64")) - LumiData._get_lumi_kernel(runlumis[:, 0], runlumis[:, 1], self.index, tot_lumi) - return tot_lumi[0] * (self.seconds_per_lumi_LHC if self._is_inst_lumi else 1.0) + tot_lumi = numpy.zeros((1,), dtype=numpy.dtype("float64")) + if isinstance(runlumis, dask_awkward.Array): + lumi_meta = wrap_get_lumi(runlumis._meta, self.index) + lumi_per_partition = dask_awkward.map_partitions( + wrap_get_lumi, + runlumis, + self.index_delayed, + label="get_lumi", + meta=lumi_meta, + ) + tot_lumi = awkward.sum(lumi_per_partition, keepdims=True) + else: + LumiData._get_lumi_kernel( + runlumis[:, 0], runlumis[:, 1], self.index, tot_lumi + ) + return ( + tot_lumi[0] * self.seconds_per_lumi_LHC + if self._is_inst_lumi + else tot_lumi[0] + ) @staticmethod @numba.njit(parallel=False, fastmath=False) @@ -80,8 +121,8 @@ def _build_lumi_table_kernel(runs, lumis, lumidata, index): def _get_lumi_kernel(runs, lumis, index, tot_lumi): ks_done = set() for iev in range(len(runs)): - run = np.uint32(runs[iev]) - lumi = np.uint32(lumis[iev]) + run = numpy.uint32(runs[iev]) + lumi = numpy.uint32(lumis[iev]) k = (run, lumi) if k not in ks_done: ks_done.add(k) @@ -106,9 +147,9 @@ def __init__(self, jsonfile): self._masks = {} for run, lumilist in goldenjson.items(): - mask = np.array(lumilist, dtype=np.uint32).flatten() + mask = numpy.array(lumilist, dtype=numpy.uint32).flatten() mask[::2] -= 1 - self._masks[np.uint32(run)] = mask + self._masks[numpy.uint32(run)] = mask def __call__(self, runs, lumis): """Check if run and lumi are valid @@ -134,20 +175,26 @@ def apply(runs, lumis): _masks[k] = v runs_orig = runs - if isinstance(runs, ak.highlevel.Array): - runs = ak.to_numpy(ak.typetracer.length_zero_if_typetracer(runs)) - if isinstance(lumis, ak.highlevel.Array): - lumis = ak.to_numpy(ak.typetracer.length_zero_if_typetracer(lumis)) - mask_out = np.zeros(dtype="bool", shape=runs.shape) + if isinstance(runs, awkward.highlevel.Array): + runs = awkward.to_numpy( + awkward.typetracer.length_zero_if_typetracer(runs) + ) + if isinstance(lumis, awkward.highlevel.Array): + lumis = awkward.to_numpy( + awkward.typetracer.length_zero_if_typetracer(lumis) + ) + mask_out = numpy.zeros(dtype="bool", shape=runs.shape) LumiMask._apply_run_lumi_mask_kernel(_masks, runs, lumis, mask_out) - if isinstance(runs_orig, ak.Array): - mask_out = ak.Array(mask_out) - if ak.backend(runs_orig) == "typetracer": - mask_out = ak.Array(mask_out.layout.to_typetracer(forget_length=True)) + if isinstance(runs_orig, awkward.Array): + mask_out = awkward.Array(mask_out) + if awkward.backend(runs_orig) == "typetracer": + mask_out = awkward.Array( + mask_out.layout.to_typetracer(forget_length=True) + ) return mask_out - if isinstance(runs, dak.Array): - return dak.map_partitions(apply, runs, lumis) + if isinstance(runs, dask_awkward.Array): + return dask_awkward.map_partitions(apply, runs, lumis) else: return apply(runs, lumis) @@ -156,16 +203,77 @@ def apply(runs, lumis): @numba.njit(parallel=False, fastmath=True) def _apply_run_lumi_mask_kernel(masks, runs, lumis, mask_out): for iev in numba.prange(len(runs)): - run = np.uint32(runs[iev]) - lumi = np.uint32(lumis[iev]) + run = numpy.uint32(runs[iev]) + lumi = numpy.uint32(lumis[iev]) if run in masks: lumimask = masks[run] - ind = np.searchsorted(lumimask, lumi) - if np.mod(ind, 2) == 1: + ind = numpy.searchsorted(lumimask, lumi) + if numpy.mod(ind, 2) == 1: mask_out[iev] = 1 +def _wrap_unique(array): + out = numpy.unique(awkward.typetracer.length_one_if_typetracer(array)) + + if awkward.backend(array) == "typetracer": + out = awkward.Array( + out.layout.to_typetracer(forget_length=True), + behavior=out.behavior, + attrs=out.attrs, + ) + return out + + +def _lumilist_dak_unique(runs_and_lumis, split_every=8): + concat_fn = partial(awkward.concatenate, axis=0) + + tree_node_fn = _wrap_unique + finalize_fn = _wrap_unique + + label = "lumilist-unique" + + token = tokenize( + runs_and_lumis, + numpy.unique, + label, + numpy.uint64, + split_every, + ) + + name_tree_node = f"{label}-tree-node-{token}" + name_finalize = f"{label}-finalize-{token}" + + chunked = dask_awkward.map_partitions( + _wrap_unique, runs_and_lumis, label="lumilist-unique-chunked" + ) + + trl = AwkwardTreeReductionLayer( + name=name_finalize, + name_input=chunked.name, + npartitions_input=chunked.npartitions, + concat_func=concat_fn, + tree_node_func=tree_node_fn, + finalize_func=finalize_fn, + split_every=split_every, + tree_node_name=name_tree_node, + ) + + graph = HighLevelGraph.from_collections(name_finalize, trl, dependencies=(chunked,)) + + meta = _wrap_unique(runs_and_lumis._meta) + + return new_array_object(graph, name_finalize, meta=meta, npartitions=1) + + +def _packed_unique(runs, lumis): + merged = (awkward.values_astype(runs, numpy.uint64) << 32) | awkward.values_astype( + lumis, numpy.uint64 + ) + uniques = _lumilist_dak_unique(merged) + return (uniques >> 32), (uniques & 0xFFFFFFFF) + + class LumiList: """Mergeable list of unique (run, lumiSection) values @@ -173,25 +281,62 @@ class LumiList: Parameters ---------- - runs : numpy.ndarray + runs : numpy.ndarray, dask_awkward.Array Vectorized list of run numbers - lumis : numpy.ndarray + lumis : numpy.ndarray, dask_awkward.Array Vectorized list of lumiSection values + delayed_default: bool + """ - def __init__(self, runs=None, lumis=None): - self.array = np.zeros(shape=(0, 2)) - if runs is not None: - self.array = np.unique(np.c_[runs, lumis], axis=0) + def __init__(self, runs=None, lumis=None, delayed_default=True): + if (runs is None) != (lumis is None): + raise ValueError( + "Both runs and lumis must be provided when given to the constructor of LumiList." + ) + + if delayed_default and runs is None: + raise ValueError( + "You must supply runs and lumis when using LumiList is delayed mode." + ) + + self.array = None + if not delayed_default: + self.array = numpy.zeros(shape=(0, 2)) + + if isinstance(runs, dask_awkward.Array) and isinstance( + lumis, dask_awkward.Array + ): + unq_run, unq_lumi = _packed_unique(runs, lumis) + self.array = awkward.concatenate( + [unq_run[:, None], unq_lumi[:, None]], axis=1 + ) + else: + if runs is not None: + self.array = numpy.unique(numpy.c_[runs, lumis], axis=0) def __iadd__(self, other): # TODO: re-apply unique? Or wait until end if isinstance(other, LumiList): - self.array = np.r_[self.array, other.array] + if isinstance(self.array, dask_awkward.Array): + carray = awkward.concatenate([self.array, other.array], axis=0) + unq_run, unq_lumi = _packed_unique(carray[:, 0], carray[:, 1]) + self.array = awkward.concatenate( + [unq_run[:, None], unq_lumi[:, None]], axis=1 + ) + else: + self.array = numpy.r_[self.array, other.array] else: raise ValueError("Expected LumiList object, got %r" % other) return self + def __add__(self, other): + temp = LumiList(runs=other.array[:, 0], lumis=other.array[:, 1]) + temp += self + return temp + def clear(self): """Clear current lumi list""" - self.array = np.zeros(shape=(0, 2)) + if isinstance(self.array, dask_awkward.Array): + raise RuntimeError("Delayed-mode LumiList cannot be cleared!") + self.array = numpy.zeros(shape=(0, 2)) diff --git a/tests/test_lumi_tools.py b/tests/test_lumi_tools.py index f74113b94..e37aff105 100644 --- a/tests/test_lumi_tools.py +++ b/tests/test_lumi_tools.py @@ -1,10 +1,12 @@ import awkward as ak import cloudpickle +import dask import dask_awkward as dak +import numpy as np from dask.distributed import Client from coffea.lumi_tools import LumiData, LumiList, LumiMask -from coffea.util import numpy as np +from coffea.nanoevents import NanoEventsFactory def test_lumidata(): @@ -114,7 +116,7 @@ def test_lumilist(): llist1 = LumiList(runs=runslumis1[:, 0], lumis=runslumis1[:, 1]) llist2 = LumiList(runs=runslumis2[:, 0], lumis=runslumis2[:, 1]) - llist3 = LumiList() + llist3 = LumiList(delayed_default=False) llist3 += llist1 llist3 += llist2 @@ -127,3 +129,43 @@ def test_lumilist(): llist1.clear() assert llist1.array.size == 0 + + +def test_lumilist_dask(): + lumidata = LumiData("tests/samples/lumi_small.csv") + + runslumis1 = np.zeros((10, 2), dtype=np.uint32) + runslumis1[:, 0] = lumidata._lumidata[0:10, 0] + runslumis1[:, 1] = lumidata._lumidata[0:10, 1] + + runslumis2 = np.zeros((10, 2), dtype=np.uint32) + runslumis2[:, 0] = lumidata._lumidata[10:20, 0] + runslumis2[:, 1] = lumidata._lumidata[10:20, 1] + + drunslumis1 = dak.from_awkward(ak.Array(runslumis1), 3) + drunslumis2 = dak.from_awkward(ak.Array(runslumis2), 3) + + llist1 = LumiList(runs=drunslumis1[:, 0], lumis=drunslumis1[:, 1]) + llist2 = LumiList(runs=drunslumis2[:, 0], lumis=drunslumis2[:, 1]) + llist3 = llist1 + llist2 + + lumi1 = lumidata.get_lumi(llist1) + lumi2 = lumidata.get_lumi(llist2) + lumi3 = lumidata.get_lumi(llist3) + + lumi1, lumi2, lumi3 = dask.compute(lumi1, lumi2, lumi3) + + assert abs(lumi3 - (lumi1 + lumi2)) < 1e-4 + + +def test_lumilist_client_fromfile(): + with Client() as _: + events = NanoEventsFactory.from_root( + {"tests/samples/nano_dy.root": "Events"}, + ).events() + + lumilist = LumiList(runs=events.run, lumis=events.luminosityBlock) + + (result,) = dask.compute(lumilist.array) + + assert result.to_list() == [[1, 13889]]