Skip to content

Utilities reference

Core functionality (utils.core)

zfit_installed()

Source code in src/triggercalib/utils/core.py
def zfit_installed():
    # TODO: write docstring
    try:
        # Import zfit and ignore in the linting
        import zfit  # noqa: F401
    except ModuleNotFoundError:
        return False
    return True

Helper functions (utils.helpers)

has_zfit = core.zfit_installed() module-attribute

bin_nums_from_hist(hist)

Source code in src/triggercalib/utils/helpers.py
def bin_nums_from_hist(hist):

    xbins = range(1, hist.GetNbinsX() + 1)
    if isinstance(hist, R.TH2):
        ybins = range(1, hist.GetNbinsY() + 1)
        bin_indices = it.product(xbins, ybins)
    else:
        bin_indices = [(b,) for b in xbins]

    return [hist.GetBin(*b) for b in bin_indices]

bins_from_taxis(axis, as_array=False)

Extract bin edges from a ROOT TAxis object

Parameters:

Name Type Description Default
axis

ROOT TAxis object to extract bins from

required
as_array

Whether to return bins as a ROOT array

False

Returns:

Type Description

list or array: List of bin edges or ROOT array of bin edges

Source code in src/triggercalib/utils/helpers.py
def bins_from_taxis(axis, as_array=False):
    """Extract bin edges from a ROOT TAxis object

    Args:
        axis: ROOT TAxis object to extract bins from
        as_array: Whether to return bins as a ROOT array

    Returns:
        list or array: List of bin edges or ROOT array of bin edges
    """
    bins = [axis.GetBinLowEdge(0)] + [
        axis.GetBinUpEdge(i) for i in range(1, axis.GetNbins() + 1)
    ]

    if as_array:
        return array("d", bins)
    return bins

construct_variable(name, backend, value=None, limits=None, title=None)

Source code in src/triggercalib/utils/helpers.py
def construct_variable(
    name,
    backend,
    value: float = None,
    limits: Annotated[List[float], 2] = None,
    title: str = None,
):
    # TODO: write docstring

    if title is None:
        title = name

    if backend == "roofit":
        if limits:
            if value:
                return R.RooRealVar(name, title, value, *limits)
            return R.RooRealVar(name, title, *limits)
        elif value:
            return R.RooRealVar(name, title, value)
        return R.RooRealVar(name, title, -np.inf, np.inf)
    elif has_zfit and backend == "zfit":
        # TODO: <- Inform user that value is ignored when creating a zfit Space
        if limits is None:
            limits = (-np.inf, np.inf)
        return zfit.Space(name, limits=limits)

    raise ValueError(
        f"Backend '{backend}' not recognised. Variable '{name}' could not be constructed."
    )

create_dataset(data, observable, weight='')

Source code in src/triggercalib/utils/helpers.py
def create_dataset(
    data: Dict[str, np.ndarray],
    observable: types.observable,
    weight="",
):
    # TODO: write docstring

    observables = observable if isinstance(observable, List) else [observable]

    backends = [get_backend(observable=observable) for observable in observables]
    if len(set(backends)) == 1:
        backend = backends[0]
    else:
        raise ValueError(
            f"Unsupported combination of observables. These must be either all ROOT RooAbsReal or all zFit Spaces. Observables: {observables}"
        )

    if weight and not any(get_variable_name(obs) for obs in observables):
        observables.append(construct_variable(weight, backend))

    if backend == "roofit":
        return R.RooDataSet.from_numpy(data, observables, weight_name=weight)

    elif has_zfit and backend == "zfit":
        np_dataset = np.array(
            [
                branch
                for branch_name, branch in data.items()
                if branch_name in [get_variable_name(obs) for obs in observables]
            ]
        ).T
        return zfit.Data.from_numpy(
            obs=zfit.dimension.combine_spaces(*observables),
            array=np_dataset,
            weights=data[weight] if weight else None,
        )

empty_histogram(name, binning)

Create an empty ROOT.TH1D with specified binning scheme

Parameters:

Name Type Description Default
name

Name for the histogram

required
binning

binning scheme of interest

required

Returns:

Type Description

TH1D or TH2D: Empty histogram with appropriate binning

Source code in src/triggercalib/utils/helpers.py
def empty_histogram(name, binning):
    """Create an empty ROOT.TH1D with specified binning scheme

    Args:
        name: Name for the histogram
        binning: binning scheme of interest

    Returns:
        TH1D or TH2D: Empty histogram with appropriate binning
    """
    bin_vars = list(binning.keys())
    if len(bin_vars) == 1:
        return R.TH1D(
            name,
            name,
            len(binning[bin_vars[0]]) - 1,
            array("d", binning[bin_vars[0]]),
        )
    elif len(bin_vars) == 2:
        return R.TH2D(
            name,
            name,
            len(binning[bin_vars[0]]) - 1,
            array("d", binning[bin_vars[0]]),
            len(binning[bin_vars[1]]) - 1,
            array("d", binning[bin_vars[1]]),
        )
    raise NotImplementedError(
        "Construction of empty histograms above 2D not yet implemented in TriggerCalib"
    )

fit_result_to_string(fit_result)

Source code in src/triggercalib/utils/helpers.py
def fit_result_to_string(fit_result):
    # TODO: write docstring
    _width = 48

    result_string = "=" * _width + "\n"
    if isinstance(fit_result, R.RooFitResult):
        result_string += f"Fit performed with RooFit from ROOT {R.__version__}\n"
        result_string += "\nInitial parameters:\n"
        for var in fit_result.floatParsInit():
            result_string += f"{var.GetName()}: {var.getVal()} +/- {var.getError()}\n"
        result_string += "\nFinal parameters:\n"
        for var in fit_result.floatParsFinal():
            result_string += f"{var.GetName()}: {var.getVal()} +/- {var.getError()}\n"

        if len(fit_result.constPars()) > 0:
            result_string += "\nConstant parameters:\n"
            for var in fit_result.constPars():
                result_string += f"{var.GetName()}: {var.getVal()}\n"
        result_string += f"\nCovariance quality: {fit_result.covQual()}\n"
        result_string += f"Fit status: {fit_result.status()}\n"
        result_string += f"Minimum value: {fit_result.minNll()}\n"
        result_string += "=" * _width + "\n"

        return result_string

    elif has_zfit and isinstance(fit_result, zfit.minimizers.fitresult.FitResult):
        result_string += f"Fit performed with zfit {zfit.__version__}"
        result_string += "\nFinal parameters:\n"
        for param, param_info in fit_result.params.items():
            result_string += f"{param.name}: {param_info['value']} +/- {param_info['hesse']['error']}\n"
        result_string += f"\nValid: {fit_result.valid}\n"
        result_string += f"Converged: {fit_result.converged}\n"
        result_string += f"Fit status: {fit_result.status}\n"
        result_string += f"Minimum value: {fit_result.fmin}\n"
        result_string += "=" * _width + "\n"

        return result_string

    raise TypeError(
        f"Unrecognised type '{type(fit_result)}' for 'fit_result'. 'fit_result' must be of type 'ROOT.RooFitResult' or 'zfit.minimizers.fitresult.FitResult'."
    )

get_backend(data=None, observable=None, pdf=None)

Source code in src/triggercalib/utils/helpers.py
def get_backend(data=None, observable=None, pdf=None):
    # TODO: write docstring

    roofit_objects = []
    zfit_objects = []

    if data is None and observable is None and pdf is None:
        return None

    if data is not None:
        if isinstance(data, R.RooAbsData):
            roofit_objects.append(data)
        elif has_zfit and isinstance(data, zfit.Data):
            zfit_objects.append(data)

    if observable is not None:
        if isinstance(observable, R.RooAbsReal):
            roofit_objects.append(observable)
        elif has_zfit and isinstance(observable, zfit.Space):
            zfit_objects.append(observable)

    if pdf is not None:
        if isinstance(pdf, R.RooAbsPdf):
            roofit_objects.append(pdf)
        elif has_zfit and isinstance(pdf, zfit.pdf.BasePDF):
            zfit_objects.append(pdf)

    if len(roofit_objects) > 0 and len(zfit_objects) > 0:
        raise ValueError(
            f"Unsupported combination of fitting objects. These must be either both RooFit objects or both zFit objects. RooFit objects: {roofit_objects}, zFit objects: {zfit_objects}"
        )

    if len(roofit_objects) > 0:
        return "roofit"
    elif len(zfit_objects) > 0:
        return "zfit"

    raise ValueError(
        f"Unsupported combination of fitting objects. These must be either both RooFit objects or both zFit objects. RooFit objects: {roofit_objects}, zFit objects: {zfit_objects}"
    )

get_variable_name(observable)

Source code in src/triggercalib/utils/helpers.py
def get_variable_name(observable: types.observable):
    # TODO: write docstring

    if isinstance(observable, R.RooAbsReal):
        return observable.GetName()
    elif has_zfit and isinstance(observable, zfit.Space):
        return observable.obs[0]
    raise NotImplementedError(
        f"Could not determine name for observable of type '{type(observable)}'"
    )

parse_selection(particle, selection, category, branches)

Parse trigger selection strings into ROOT cut expressions

Parameters:

Name Type Description Default
selection Union[str, List[str]]

Trigger line(s) to create selection for

required
category Literal['', 'TIS', 'TOS']

Optional trigger outcome category (TIS/TOS) to append

required

Returns:

Name Type Description
str

ROOT cut expression combining the selections

Source code in src/triggercalib/utils/helpers.py
def parse_selection(
    particle: str,
    selection: Union[str, List[str]],
    category: Literal["", "TIS", "TOS"],
    branches: List[str],
):
    """Parse trigger selection strings into ROOT cut expressions

    Args:
        selection: Trigger line(s) to create selection for
        category: Optional trigger outcome category (TIS/TOS) to append

    Returns:
        str: ROOT cut expression combining the selections
    """

    if isinstance(selection, str):
        # Inspired by equivalent in HltEfficiencyChecker: https://gitlab.cern.ch/lhcb/DaVinci/-/blob/edc0bc4860643dc82c7c9b84828a4208426d9f15/HltEfficiencyChecker/python/HltEfficiencyChecker/config.py#L71-L72

        search = re.compile(f"{selection}Decision")
        selection = [
            branch.replace("Decision", "")
            for branch in branches
            if search.fullmatch(branch)
        ]

        if category:
            for line in selection:
                if f"{particle}_{line}Decision_{category}" not in branches:
                    raise KeyError(
                        f"Branch '{particle}_{line}Decision_{category}' could not be found"
                    )

    selection.sort()

    cuts = []
    for level in ("Hlt1", "Hlt2"):
        cut = " || ".join(
            (f"{particle}_{line}Decision_{category}" if category else f"{line}Decision")
            for line in selection
            if line.startswith(level)
        )
        if cut:
            cuts += [f"({cut})"]

    return " && ".join(cuts)

sum_bins(hist)

Source code in src/triggercalib/utils/helpers.py
def sum_bins(hist):
    contents = []
    errors = []

    bin_nums = bin_nums_from_hist(hist)
    for n_bin in bin_nums:
        contents.append(hist.GetBinContent(n_bin))
        errors.append(hist.GetBinError(n_bin))

    sum_content = np.sum(contents)
    sum_error = np.sqrt(np.sum(np.array(errors) ** 2))

    return sum_content, sum_error

tgraph_to_np(tgraph, xscale=1, yscale=1, zscale=1, symmetric_uncertainties=False)

Source code in src/triggercalib/utils/helpers.py
def tgraph_to_np(tgraph, xscale=1, yscale=1, zscale=1, symmetric_uncertainties=False):
    # TODO: write docstring
    if isinstance(tgraph, R.TGraph2D) or symmetric_uncertainties:
        th = tgraph_to_th(tgraph)
        return th_to_np(th, xscale, yscale, zscale=zscale)

    graph = up.from_pyroot(tgraph)
    xvals, yvals = graph.values()
    xlow_errs, ylow_errs = graph.errors("low")
    xhigh_errs, yhigh_errs = graph.errors("high")

    return (
        xvals * xscale,
        yvals * yscale,
        (xlow_errs * xscale, xhigh_errs * xscale),
        (ylow_errs * yscale, yhigh_errs * yscale),
    )

tgraph_to_th(graph, name='', title='')

Convert a ROOT TGraph(2D)AsymmErrors to a TH1(2)D

Parameters:

Name Type Description Default
graph

ROOT TGraph(2D)AsymmErrors to convert

required
name

Optional name for output histogram

''
title

Optional title for output histogram

''

Returns:

Type Description

TH1D or TH2D: Histogram containing graph points with symmetric errors

Source code in src/triggercalib/utils/helpers.py
def tgraph_to_th(graph, name="", title=""):
    """Convert a ROOT TGraph(2D)AsymmErrors to a TH1(2)D

    Args:
        graph: ROOT TGraph(2D)AsymmErrors to convert
        name: Optional name for output histogram
        title: Optional title for output histogram

    Returns:
        TH1D or TH2D: Histogram containing graph points with symmetric errors
    """

    x = c_double(0)
    y = c_double(0)
    z = c_double(0)

    x_edges = []
    y_edges = []

    x_vals = []
    y_vals = []
    values = []
    errors = []

    for n in range(graph.GetN()):
        if isinstance(graph, R.TGraph2DAsymmErrors):
            graph.GetPoint(n, x, y, z)
        else:
            graph.GetPoint(n, x, y)

        x_edges.append(x.value - graph.GetErrorXlow(n))
        x_edges.append(x.value + graph.GetErrorXhigh(n))

        x_vals.append(x.value)
        if isinstance(graph, R.TGraph2DAsymmErrors):
            y_edges.append(y.value - graph.GetErrorYlow(n))
            y_edges.append(y.value + graph.GetErrorYhigh(n))

            y_vals.append(y.value)
            values.append(z.value)
            errors.append(graph.GetErrorZ(n))
        else:
            values.append(y.value)
            errors.append(graph.GetErrorY(n))

    x_edges = np.array(sorted(list(set(x_edges))))
    y_edges = np.array(sorted(list(set(y_edges))))

    if isinstance(graph, R.TGraph2DAsymmErrors):
        hist = R.TH2D(
            name if name else f"{graph.GetName()}_TH2D",
            title if title else f"{graph.GetTitle()}_TH2D",
            len(x_edges) - 1,
            x_edges,
            len(y_edges) - 1,
            y_edges,
        )
        for x_val, y_val, value, error in zip(x_vals, y_vals, values, errors):
            n = hist.FindBin(x_val, y_val)
            hist.SetBinContent(n, value)
            hist.SetBinError(n, error)
    else:
        hist = R.TH1D(
            name if name else f"{graph.GetName()}_TH1D",
            title if title else f"{graph.GetTitle()}_TH1D",
            len(x_edges) - 1,
            x_edges,
        )
        for x_val, value, error in zip(x_vals, values, errors):
            n = hist.FindBin(x_val)
            hist.SetBinContent(n, value)
            hist.SetBinError(n, error)

    return hist

th_to_np(th, xscale=1, yscale=1, zscale=1)

Source code in src/triggercalib/utils/helpers.py
def th_to_np(th, xscale=1, yscale=1, zscale=1):
    # TODO: write docstring

    histogram = up.from_pyroot(th)
    if isinstance(th, R.TH2):
        zvals, xedges, yedges = histogram.to_numpy()

        xerrs = np.diff(xedges) / 2
        xvals = xedges[:-1] + xerrs

        yerrs = np.diff(yedges) / 2
        yvals = yedges[:-1] + yerrs

        zerrs = histogram.errors()

        return (
            xvals * xscale,
            yvals * yscale,
            zvals * zscale,
            xerrs * xscale,
            yerrs * yscale,
            zerrs * zscale,
        )

    yvals, xedges = histogram.to_numpy()

    xerrs = np.diff(xedges) / 2
    xvals = xedges[:-1] + xerrs

    yerrs = histogram.errors()

    return xvals * xscale, yvals * yscale, xerrs * xscale, yerrs * yscale

write_fit_result(fit_result, path, verbose=False)

Source code in src/triggercalib/utils/helpers.py
def write_fit_result(fit_result, path, verbose=False):
    # TODO: write docstring

    result_string = fit_result_to_string(fit_result)
    if verbose:
        print(result_string)

    with open(path, "w") as result_file:
        result_file.write(result_string)

    return

I/O functionality (utils.io)

load_config(path)

Load a configuration file from a JSON or YAML file

Parameters:

Name Type Description Default
path str

Path to the configuration file

required

Returns:

Name Type Description
dict

Configuration as a dictionary

Raises:

Type Description
ValueError

If the file is not a '.json' or '.yaml' file

Source code in src/triggercalib/utils/io.py
def load_config(path: str):
    """
    Load a configuration file from a JSON or YAML file

    Args:
        path: Path to the configuration file

    Returns:
        dict: Configuration as a dictionary

    Raises:
        ValueError: If the file is not a '.json' or '.yaml' file
    """

    if path.endswith(".json"):
        with open(path, "r") as config_file:
            config = json.load(config_file)
    elif path.endswith(".yaml"):
        with open(path, "r") as config_file:
            config = yaml.safe_load(config_file)
    else:
        raise ValueError(f"Config '{path}' must be a '.json' or '.yaml' file")
    return config

split_paths(paths, require_same_tree=True)

Split ROOT file paths into tree names and file paths

Parameters:

Name Type Description Default
paths Union[List[str], str]

Path(s) to ROOT file(s) of the form :

required
require_same_tree bool

Whether all paths must reference the same tree

True

Returns:

Name Type Description
tuple

Tree name(s) and file path(s)

Raises:

Type Description
ValueError

If require_same_tree is True and different trees are provided

Source code in src/triggercalib/utils/io.py
def split_paths(paths: Union[List[str], str], require_same_tree: bool = True):
    """Split ROOT file paths into tree names and file paths

    Args:
        paths: Path(s) to ROOT file(s) of the form <path>:<tree>
        require_same_tree: Whether all paths must reference the same tree

    Returns:
        tuple: Tree name(s) and file path(s)

    Raises:
        ValueError: If require_same_tree is True and different trees are provided
    """
    if isinstance(paths, str):
        paths = [paths]

    split_trees = []
    split_paths = []

    for p in paths:
        split_path, split_tree = p.rsplit(":", 1)
        split_trees.append(split_tree)
        split_paths.append(split_path)

    if len(set(split_trees)) == 1 and require_same_tree:
        return split_trees[0], split_paths
    elif not require_same_tree:
        return split_trees, split_paths

    raise ValueError(
        f"Same tree must be provided for all paths. Trees '{split_trees}' were provided."
    )

write_config(config, path)

Source code in src/triggercalib/utils/io.py
def write_config(config: dict, path: str):

    if "/" in path:
        os.makedirs(path.rsplit("/", 1)[0], exist_ok=True)

    if path.endswith(".json"):
        with open(path, "w") as config_file:
            json.dump(config, config_file, indent=4)
    elif path.endswith(".yaml"):
        with open(path, "w") as config_file:
            yaml.dump(config, config_file, default_flow_style=False)
    else:
        raise ValueError(f"Config '{path}' must be a '.json' or '.yaml' file")

Statistics tools (utils.stats)

poisson(passed, total, passed_error=None, total_error=None)

Calculate the efficiency and symmetric uncertainty based on propagation of Poisson-like errors

Parameters:

Name Type Description Default
passed Union[float, List[float], ndarray[float]]

int or array of int Number of events passing a criteria (numerator of the efficiency).

required
total Union[float, List[float], ndarray[float]]

int or array of int Total number of events (denominator of the efficiency).

required
passed_error Union[float, List[float], ndarray[float]]

int or array of int, optional Uncertainty on the number of events that pass some criteria if not strictly Poissonian; defaults to Poissonian uncertainty, sqrt(passed), if not specified.

None
total_error Union[float, List[float], ndarray[float]]

int or array of int, optional Uncertainty on the total number of events if not strictly Poissonian; defaults to Poissonian uncertainty, sqrt(total), if not specified.

None

Returns:

Name Type Description
value

float or array of float The raw efficiency (passed / total).

lower_error

float or array of float The symmetric error on the efficiency

upper_error

float or array of float The upper error on the efficiency (same as lower error, since uncertainty is symmetric)

Notes

This function implements the propagation of Poisson-like uncertainties as detailed in Eq. 14 of LHCb-PUB-2014-039 (https://cds.cern.ch/record/1701134/).

Source code in src/triggercalib/utils/stats.py
def poisson(
    passed: Union[float, List[float], np.ndarray[float]],
    total: Union[float, List[float], np.ndarray[float]],
    passed_error: Union[float, List[float], np.ndarray[float]] = None,
    total_error: Union[float, List[float], np.ndarray[float]] = None,
):
    """
    Calculate the efficiency and symmetric uncertainty based on propagation of Poisson-like errors

    Args:
        passed: int or array of int
            Number of events passing a criteria (numerator of the efficiency).
        total: int or array of int
            Total number of events (denominator of the efficiency).
        passed_error: int or array of int, optional
            Uncertainty on the number of events that pass some criteria if not strictly Poissonian;
            defaults to Poissonian uncertainty, `sqrt(passed)`, if not specified.
        total_error: int or array of int, optional
            Uncertainty on the total number of events if not strictly Poissonian;
            defaults to Poissonian uncertainty, `sqrt(total)`, if not specified.

    Returns:
        value: float or array of float
            The raw efficiency (passed / total).
        lower_error: float or array of float
            The symmetric error on the efficiency
        upper_error: float or array of float
            The upper error on the efficiency (same as lower error, since uncertainty is symmetric)

    Notes:
        This function implements the propagation of Poisson-like uncertainties as detailed in Eq. 14 of LHCb-PUB-2014-039 (https://cds.cern.ch/record/1701134/).
    """

    if passed_error is None:
        passed_error = np.sqrt(passed)
    if total_error is None:
        total_error = np.sqrt(total)

    failed = total - passed
    failed_error = np.sqrt(total_error**2 - passed_error**2)

    value = passed / total
    error = np.sqrt(
        (failed / total**2) ** 2 * passed_error**2
        + (passed / total**2) ** 2 * failed_error**2
    )

    return value, error, error

wilson(passed, total, confidence=0.68, passed_error=None, total_error=None)

Calculate the efficiency and lower/upper uncertainties based on a generalised Wilson interval.

Parameters:

Name Type Description Default
passed Union[float, List[float], ndarray[float]]

float or array of float Number of events passing a critera (numerator of the efficiency)

required
total Union[float, List[float], ndarray[float]]

float or array of float Total number of events (denominator of the efficiency)

required
confidence float

float, optional Confidence level for the interval; defaults to 0.68 (1 sigma), per the recommendation of the LHCb Statistics guidelines

0.68
passed_error Union[float, List[float], ndarray[float]]

int or array of int, optional Uncertainty on the number of events that pass some criteria if not strictly Poissonian; defaults to Poissonian uncertainty, sqrt(passed), if not specified

None
total_error Union[float, List[float], ndarray[float]]

int or array of int, optional Uncertainty on the total number of events if not strictly Poissonian; defaults to Poissonian uncertainty, sqrt(total), if not specified

None

Returns:

Name Type Description
efficiency

float or array of float The raw efficiency (passed / total)

lower_error

float or array of float The lower error on the efficiency from the Wilson interval

upper_error

float or array of float The upper error on the efficiency from the Wilson interval

Notes

This function implements the generalised Wilson interval of H. Dembinski and M. Schmelling, 2022 (https://arxiv.org/pdf/2110.00294).

Source code in src/triggercalib/utils/stats.py
def wilson(
    passed: Union[float, List[float], np.ndarray[float]],
    total: Union[float, List[float], np.ndarray[float]],
    confidence: float = 0.68,
    passed_error: Union[float, List[float], np.ndarray[float]] = None,
    total_error: Union[float, List[float], np.ndarray[float]] = None,
):
    """
    Calculate the efficiency and lower/upper uncertainties based on a generalised Wilson interval.

    Args:
        passed: float or array of float
            Number of events passing a critera (numerator of the efficiency)
        total: float or array of float
            Total number of events (denominator of the efficiency)
        confidence: float, optional
            Confidence level for the interval; defaults to 0.68 (1 sigma), per the recommendation of the LHCb Statistics guidelines
        passed_error: int or array of int, optional
            Uncertainty on the number of events that pass some criteria if not strictly Poissonian; defaults to Poissonian uncertainty, `sqrt(passed)`, if not specified
        total_error: int or array of int, optional
            Uncertainty on the total number of events if not strictly Poissonian; defaults to Poissonian uncertainty, `sqrt(total)`, if not specified

    Returns:
        efficiency: float or array of float
            The raw efficiency (passed / total)
        lower_error: float or array of float
            The lower error on the efficiency from the Wilson interval
        upper_error: float or array of float
            The upper error on the efficiency from the Wilson interval

    Notes:
        This function implements the generalised Wilson interval of H. Dembinski and M. Schmelling, 2022 (https://arxiv.org/pdf/2110.00294).
    """

    if passed_error is None:
        passed_error = np.sqrt(passed)
    if total_error is None:
        total_error = np.sqrt(total)

    passed_nonpoisson_variance = np.nan_to_num(
        passed_error**2 - passed
    )  # non-Poisson term in variance on n(passed)
    if isinstance(passed_nonpoisson_variance, (List, np.ndarray)):
        passed_nonpoisson_variance[passed_nonpoisson_variance < 1e-12] = 0
    elif passed_nonpoisson_variance < 1e-12:
        passed_nonpoisson_variance = 0

    failed_nonpoisson_variance = np.nan_to_num(
        total_error**2 - passed_error**2 - total + passed
    )
    if isinstance(failed_nonpoisson_variance, (List, np.ndarray)):
        failed_nonpoisson_variance[failed_nonpoisson_variance < 1e-12] = 0
    elif failed_nonpoisson_variance < 1e-12:
        failed_nonpoisson_variance = 0

    # Define terms in line with the notation of the paper where possible
    n = total
    p = passed / total if total > 0 else 0
    z = NormalDist().inv_cdf((1 + confidence) / 2)

    # Calculate the lower and upper limits of the interval
    prefactor = (
        1
        / (
            1
            + (z**2 / n)
            * (1 - (passed_nonpoisson_variance + failed_nonpoisson_variance) / n)
        )
        if n > 0
        else 0
    )
    positive_term = (
        p + (z**2 / (2 * n)) * (1 - 2 * passed_nonpoisson_variance / n) if n > 0 else 0
    )
    plusminus_term = (
        (
            z
            / n
            * np.sqrt(
                p**2 * (passed_nonpoisson_variance + failed_nonpoisson_variance - n)
                + p * (n - 2 * passed_nonpoisson_variance)
                + passed_nonpoisson_variance
                + z**2
                / 4
                * (
                    1
                    - 4 * passed_nonpoisson_variance * failed_nonpoisson_variance / n**2
                )
            )
        )
        if n > 0
        else 0
    )

    lower_limit = np.maximum(
        np.nan_to_num(prefactor * (positive_term - plusminus_term)), 0
    )
    upper_limit = np.minimum(
        np.nan_to_num(prefactor * (positive_term + plusminus_term)), 1
    )

    return p, p - lower_limit, upper_limit - p

Test utilities (utils.tests)

distributions = {'breitwigner': 'BreitWigner', 'doublecrystalball': 'CrystalBall', 'exponential': 'Exponential', 'gauss': 'Gaussian'} module-attribute

build_component(ws, name, observable, component)

Build a RooFit PDF component

Parameters:

Name Type Description Default
ws

RooWorkspace to build component in

required
name

Name for the component

required
observable

Observable to build PDF for

required
component

Component configuration dictionary

required

Returns:

Name Type Description
RooAbsPdf

Built PDF component

Source code in src/triggercalib/utils/tests.py
def build_component(ws, name, observable, component):
    """Build a RooFit PDF component

    Args:
        ws: RooWorkspace to build component in
        name: Name for the component
        observable: Observable to build PDF for
        component: Component configuration dictionary

    Returns:
        RooAbsPdf: Built PDF component
    """
    distribution = distributions[component["model"]]
    expanded_vars = ", ".join(
        f"{name}_{variable}[{', '.join(str(value) for value in values)}]"
        for variable, values in component["variables"].items()
    )
    expanded_vars = f"{observable}, {expanded_vars}"

    ws.factory(f"{distribution}::{name}_pdf({expanded_vars})")

    return ws

check_result(value, error, sample='example_file.root:Hlt2Test/DecayTree', cut='', line='Hlt1DummyOne', threshold=5.0)

Check if a test result matches expectations

Parameters:

Name Type Description Default
value float

Measured value to check

required
error Annotated[List[float], 2]

List of lower and upper uncertainties

required
sample str

Path to input ROOT file and tree

'example_file.root:Hlt2Test/DecayTree'
cut str

Additional selection criteria

''
line str

Trigger line to check

'Hlt1DummyOne'
threshold float

Maximum allowed deviation in percent

5.0

Returns:

Name Type Description
bool

Whether result matches expectations within threshold

Source code in src/triggercalib/utils/tests.py
def check_result(
    value: float,
    error: Annotated[List[float], 2],
    sample: str = "example_file.root:Hlt2Test/DecayTree",
    cut: str = "",
    line: str = "Hlt1DummyOne",
    threshold: float = 5.0,
):
    """Check if a test result matches expectations

    Args:
        value: Measured value to check
        error: List of lower and upper uncertainties
        sample: Path to input ROOT file and tree
        cut: Additional selection criteria
        line: Trigger line to check
        threshold: Maximum allowed deviation in percent

    Returns:
        bool: Whether result matches expectations within threshold
    """
    sample_rdf = R.RDataFrame(*reversed(sample.rsplit(":", 1)))
    sample_rdf = sample_rdf.Filter(cut)
    sample_rdf = sample_rdf.Filter("isSignal")
    denominator = sample_rdf.Count()
    numerator = sample_rdf.Filter(f"{line}Decision").Count()
    R.RDF.RunGraphs((numerator, denominator))

    true_efficiency = numerator.GetValue() / denominator.GetValue()

    result_okay = (
        true_efficiency > value - threshold * error[0]
        and true_efficiency < value + threshold * error[1]
    )
    if not result_okay:
        print(
            f"True efficiency '{true_efficiency:.4f}' does not lie within window [{value - threshold * error[0]:.4f} - {value + threshold * error[1]:.4f}] ({threshold:.1f} sigma threshold)"
        )

    return result_okay, (
        true_efficiency,
        value - threshold * error[0],
        value + threshold * error[1],
    )

factorisation_summary(factorisation_test, verbose=True)

Source code in src/triggercalib/utils/tests.py
def factorisation_summary(factorisation_test, verbose=True):
    # TODO: write docstring

    output_string = f"Split NLL: {factorisation_test.split_nll}"
    output_string += f"Simultaneous NLL: {factorisation_test.simultaneous_nll}"
    output_string += f"q-statistic: {factorisation_test.q_statistic}"
    output_string += f"p-value: {factorisation_test.p_value}"

    if verbose:
        print(output_string)
    return output_string