Skip to content

Core objects reference

objects.fitting

Fit

Initialize a new Fit instance

Parameters:

Name Type Description Default
pdf pdf

PDF model

required
data data

Data to fit

required
fit_kwargs Dict

Keyword arguments for run_fit

{}
lazy bool

Whether to perform the fit lazily (i.e., not perform the fit until fit is called)

False
min_entries float

Minimum number of events required for the fit

-1
max_attempts int

Maximum number of attempts to perform the fit

5
require_convergence bool

Whether to require the fit to converge

True
Source code in src/triggercalib/objects/fitting.py
class Fit:
    """Initialize a new Fit instance

    Args:
        pdf: PDF model
        data: Data to fit
        fit_kwargs: Keyword arguments for run_fit
        lazy: Whether to perform the fit lazily (i.e., not perform the fit
            until `fit` is called)
        min_entries: Minimum number of events required for the fit
        max_attempts: Maximum number of attempts to perform the fit
        require_convergence: Whether to require the fit to converge
    """

    def __init__(
        self,
        pdf: types.pdf,
        data: types.data,
        fit_kwargs: Dict = {},
        lazy: bool = False,
        min_entries: float = -1,
        max_attempts: int = 5,
        require_convergence: bool = True,
    ):
        # TODO: write docstring

        self.pdf = pdf
        self.data = data
        self.backend = helpers.get_backend(data=self.data, pdf=self.pdf)

        self.fit_kwargs = fit_kwargs
        self.max_attempts = max_attempts
        self.require_convergence = require_convergence

        num_events = (
            data.sumEntries()
            if self.backend == "roofit"
            else np.sum(data.weights) if data.weights is not None else len(data)
        )

        if num_events < min_entries:
            raise RuntimeError(
                f"Insufficient events to perform the fit: {num_events} events < '{min_entries}' events"
            )

        if not lazy:
            self.fit()

        return

    def fit(self):
        # TODO: write docstring

        self.result = run_fit(
            self.pdf,
            self.data,
            fit_kwargs=self.fit_kwargs,
            max_attempts=self.max_attempts,
            require_convergence=self.require_convergence,
        )
        self.converged = has_converged(self.result)
        return

objects.plotting

Plot

Class for trigger efficiency plots

This class provides functionality to create and save plots of fits to discriminating variable, supporting both RooFit and zFit backends. It handles the plotting of data and fitted PDFs (with their components if relevant), with appropriate styling for LHCb publications.

Source code in src/triggercalib/objects/plotting.py
class Plot:
    """Class for trigger efficiency plots

    This class provides functionality to create and save plots of fits to discriminating variable,
    supporting both RooFit and zFit backends. It handles the plotting of data and fitted PDFs (with
    their components if relevant), with appropriate styling for LHCb publications.
    """

    def __init__(
        self,
        name: str,
        observable: types.observable,
        data: types.data,
        pdf: types.pdf,
        plot_kwargs: dict = {},
        extension: str = ".pdf",
    ):
        """Initialize a new Plot instance

        Args:
            name: Name of the plot, used to define the filename
            observable: Variable being plotted (ROOT.RooAbsReal or zfit.Space)
            data: Dataset to plot (ROOT.RooDataSet or zfit.Data)
            pdf: Probability density function (ROOT.RooAbsPdf or zfit.pdf.BasePDF)
            extension: File extension for saved plots (default: .pdf)
            backend: Fitting backend to use ('roofit' or 'zfit')
        """

        self.name = name
        self.observable = observable
        self.data = data
        self.pdf = pdf
        self.extension = extension
        self.plot_kwargs = plot_kwargs

        if isinstance(self.observable, R.RooAbsReal) and isinstance(
            self.pdf, R.RooAbsPdf
        ):
            self._roofit_plot()
        elif (
            has_zfit
            and isinstance(self.observable, zfit.Space)
            and isinstance(self.pdf, zfit.pdf.BasePDF)
        ):
            self._zfit_plot()

        return

    def _roofit_plot(self):
        # TODO: write docstring

        with LHCbStyle():
            # Delete canvas if already exists
            _c = R.gROOT.GetListOfCanvases().FindObject(f"{self.name}_canvas")
            if isinstance(_c, R.TCanvas):
                _c.Destructor()

            self.fig = R.TCanvas(f"{self.name}_canvas")
            self.fig.cd()

            frame = self.observable.frame()
            self.data.plotOn(frame)
            if hasattr(self.pdf, "coefList") and callable(
                getattr(self.pdf, "coefList")
            ):
                colors = ["r", "g", "b", "o"]

                for pdf_i, color in zip(self.pdf.pdfList(), colors):
                    self.pdf.plotOn(
                        frame,
                        Components=[pdf_i],
                        LineStyle="--",
                        LineColor=color,
                    )
            self.pdf.plotOn(frame)

            frame.Draw()

        return self.fig

    def _zfit_plot(self):
        # TODO: write docstring

        default_kwargs = {
            "aspect": (12, 9),
            "xlabel": self.observable.obs[0],
            "ylabel": "Candidates per bin",
            "ylog": False,
            "title": "",
            "units": "",
            "lhcb_label": "Preliminary",
            "lhcb_rlabel": "",
            "pulls": True,  # <- Make this actually do something
        }

        colors = ["#ff2244", "#f1c15b", "#8fbc5c", "#5587a3", "#263a82"]
        styles = ["solid", "dashed", "dotted"]

        plot_kwargs = self.plot_kwargs.copy()
        for kwarg, value in default_kwargs.items():
            if kwarg not in plot_kwargs:
                plot_kwargs[kwarg] = value

        data = self.data.value(self.observable).numpy().ravel()
        weights = (
            self.data.weights.numpy()
            if self.data.weights is not None
            else np.ones(np.shape(data))
        )

        plot_xlims = (
            data.min(),
            data.max(),
        )  # has to be set relative to data to get the normalisation right
        if "xlims" not in plot_kwargs:
            plot_kwargs["xlims"] = plot_xlims
        if "bins" not in plot_kwargs:
            plot_kwargs["bins"] = 100
        bin_edges = np.linspace(*plot_xlims, plot_kwargs["bins"] + 1)
        bin_width = bin_edges[1] - bin_edges[0]

        # +===========+ #
        # Create figure #
        # +===========+ #
        hep.style.use("LHCb2")
        if plot_kwargs["pulls"]:
            self.fig, (plot_ax, pull_ax) = plt.subplots(
                2,
                1,
                figsize=plot_kwargs["aspect"],
                gridspec_kw={"height_ratios": [4, 1], "hspace": 0.035},
                sharex=True,
            )
        else:
            self.fig, plot_ax = plt.subplots(1, 1, figsize=plot_kwargs["aspect"])

        data_yvals, data_edges = np.histogram(data, bins=bin_edges, weights=weights)
        data_xerrs = np.diff(data_edges) / 2
        data_xvals = data_edges[:-1] + data_xerrs

        # +===============+ #
        # Plot data and fit #
        # +===============+ #
        plot_ax.errorbar(
            data_xvals,
            data_yvals,
            xerr=data_xerrs,
            yerr=np.sqrt(data_yvals),
            marker="o",
            markersize=3,
            linestyle="none",
            label="Data",
            color="black",
            capsize=2,
            elinewidth=1,
        )

        pdf_xvals = np.linspace(*plot_xlims, 1000)
        total_yield = self.pdf.get_yield().numpy()
        component_min = total_yield
        for n, component in enumerate(self.pdf.pdfs):
            component_yield = component.get_yield().numpy()
            component_yvals = (
                component.pdf(pdf_xvals)
                * component_yield
                * (bin_edges[1] - bin_edges[0])
            )
            plot_ax.plot(
                pdf_xvals,
                component_yvals,
                label=component.name,
                marker="none",
                color=colors[n % len(colors)],
                lw=2,
                ls=styles[n % len(styles)],
            )
            component_min = min(component_min, min(component_yvals))

        pdf_yvals = (
            self.pdf.pdf(pdf_xvals) * total_yield * (bin_edges[1] - bin_edges[0])
        )

        plot_ax.plot(
            pdf_xvals,
            pdf_yvals,
            label=self.pdf.name,
            marker="none",
            color="#606060",
            lw=2,
        )

        if plot_kwargs["pulls"]:
            # +========+ #
            # Pulls plot #
            # +========+ #

            pulls_yvals = (
                data_yvals - self.pdf.pdf(data_xvals) * total_yield * bin_width
            ) / np.sqrt(data_yvals)

            for n in (-3, -1, 1, 3):
                pull_ax.plot(
                    plot_kwargs["xlims"],
                    [n, n],
                    color="#d4d4d4",
                    lw=2,
                    ls="dashed",
                )
            pull_ax.bar(
                data_xvals,
                pulls_yvals,
                width=bin_width,
                label=self.pdf.name,
                color="#d4d4d4",
            )
            pull_ax.plot(
                plot_kwargs["xlims"],
                [0, 0],
                color="k",
                lw=2,
            )
            pull_ax.set_ylim(-4.5, 4.5)

        plot_ax.legend()
        plot_ax.set_xlim(*plot_kwargs["xlims"])

        if plot_kwargs["ylog"]:
            plot_ax.set_yscale("log")
        else:
            plot_ax.set_ylim(0, 1.1 * max(pdf_yvals))

        # +========+ #
        # Set labels #
        # +========+ #

        plot_ax.set_title(plot_kwargs["title"])

        xlabel = plot_kwargs["xlabel"]
        if plot_kwargs["units"]:
            xlabel = xlabel.replace("UNITS", plot_kwargs["units"])
        xlabel = xlabel.replace("BIN_WIDTH", f"{bin_width:.1f}")

        ylabel = plot_kwargs["ylabel"]
        if plot_kwargs["units"]:
            ylabel = ylabel.replace("UNITS", plot_kwargs["units"])
        ylabel = ylabel.replace("BIN_WIDTH", f"{bin_width:.1f}")

        plot_ax.set_ylabel(ylabel)

        if plot_kwargs["pulls"]:

            pull_ax.set_xlabel(xlabel)
            pull_ax.set_ylabel("Pulls")
        else:
            plot_ax.set_xlabel(xlabel)

        hep.lhcb.label(
            loc=1,
            ax=plot_ax,
            label=plot_kwargs["lhcb_label"],
            data=plot_kwargs["lhcb_label"].lower() != "simulation",
            rlabel=plot_kwargs["lhcb_rlabel"],
        )

        return

    def save(self, plot_path: str):
        """Save the plot to a file

        Args:
            plot_path: Directory path where the plot should be saved. The filename will
                       be constructed from the configured name and extension
        """
        if not os.path.exists(plot_path):
            os.makedirs(plot_path)

        path = os.path.join(plot_path, self.name)

        if isinstance(self.fig, R.TCanvas):
            self.fig.SaveAs(f"{path}{self.extension}")
        elif has_zfit and isinstance(self.fig, plt.Figure):
            self.fig.savefig(f"{path}{self.extension}")
            plt.close(self.fig)

__init__(name, observable, data, pdf, plot_kwargs={}, extension='.pdf')

Initialize a new Plot instance

Parameters:

Name Type Description Default
name str

Name of the plot, used to define the filename

required
observable observable

Variable being plotted (ROOT.RooAbsReal or zfit.Space)

required
data data

Dataset to plot (ROOT.RooDataSet or zfit.Data)

required
pdf pdf

Probability density function (ROOT.RooAbsPdf or zfit.pdf.BasePDF)

required
extension str

File extension for saved plots (default: .pdf)

'.pdf'
backend

Fitting backend to use ('roofit' or 'zfit')

required
Source code in src/triggercalib/objects/plotting.py
def __init__(
    self,
    name: str,
    observable: types.observable,
    data: types.data,
    pdf: types.pdf,
    plot_kwargs: dict = {},
    extension: str = ".pdf",
):
    """Initialize a new Plot instance

    Args:
        name: Name of the plot, used to define the filename
        observable: Variable being plotted (ROOT.RooAbsReal or zfit.Space)
        data: Dataset to plot (ROOT.RooDataSet or zfit.Data)
        pdf: Probability density function (ROOT.RooAbsPdf or zfit.pdf.BasePDF)
        extension: File extension for saved plots (default: .pdf)
        backend: Fitting backend to use ('roofit' or 'zfit')
    """

    self.name = name
    self.observable = observable
    self.data = data
    self.pdf = pdf
    self.extension = extension
    self.plot_kwargs = plot_kwargs

    if isinstance(self.observable, R.RooAbsReal) and isinstance(
        self.pdf, R.RooAbsPdf
    ):
        self._roofit_plot()
    elif (
        has_zfit
        and isinstance(self.observable, zfit.Space)
        and isinstance(self.pdf, zfit.pdf.BasePDF)
    ):
        self._zfit_plot()

    return

save(plot_path)

Save the plot to a file

Parameters:

Name Type Description Default
plot_path str

Directory path where the plot should be saved. The filename will be constructed from the configured name and extension

required
Source code in src/triggercalib/objects/plotting.py
def save(self, plot_path: str):
    """Save the plot to a file

    Args:
        plot_path: Directory path where the plot should be saved. The filename will
                   be constructed from the configured name and extension
    """
    if not os.path.exists(plot_path):
        os.makedirs(plot_path)

    path = os.path.join(plot_path, self.name)

    if isinstance(self.fig, R.TCanvas):
        self.fig.SaveAs(f"{path}{self.extension}")
    elif has_zfit and isinstance(self.fig, plt.Figure):
        self.fig.savefig(f"{path}{self.extension}")
        plt.close(self.fig)

objects.sideband

Sideband

Handler for sidebands in the sideband subtraction mitigation method

This class manages the definition and manipulation of signal and sideband regions for background subtraction in trigger efficiency calculations. It provides methods to define cuts and the appropriate scale factor to estimate the number of background events in a given width.

Source code in src/triggercalib/objects/sideband.py
class Sideband:
    """Handler for sidebands in the sideband subtraction mitigation method

    This class manages the definition and manipulation of signal and sideband regions
    for background subtraction in trigger efficiency calculations. It provides methods
    to define cuts and the appropriate scale factor to estimate the number of background
    events in a given width.
    """

    def __init__(
        self,
        variable: str,
        signal: Annotated[List[float], 2],
        sidebands: List[Annotated[List[float], 2]],
    ):
        """Initialize a new Sideband instance

        Args:
            variable: Name of the variable used for sideband subtraction
            signal: Range in which to subtract background
            sidebands: Range defining the sideband regions [lower, upper]
        """

        self.variable = variable
        self.signal = signal
        self.sidebands = sidebands

        # Find minimum and maximum bounds for the variable, and check for non-overlapping windows
        self.lower = None
        self.upper = None
        for lower, upper in self.sidebands + [self.signal]:
            for _sideband in self.sidebands:
                if lower > _sideband[0] and upper < _sideband[1]:
                    raise ValueError(
                        "Sideband and signal windows should be defined such that they do not overlap"
                    )
            if self.lower is None or lower < self.lower:
                self.lower = lower
            if self.upper is None or upper > self.upper:
                self.upper = upper

        return

    def scale(self, width=None):
        """Calculate the scaling factor for sideband subtraction

        Args:
            width: Optional width to use instead of signal region width

        Returns:
            float: Scaling factor for sideband subtraction, calculated as
                  signal_width / (lower_sideband_width + upper_sideband_width)
        """
        if not (width):
            width = np.diff(self.signal)
        return width / sum(np.diff(self.sidebands))

    def range_cut(self):
        """Generate a cut string for the full variable range

        Returns:
            str: Cut string in ROOT format selecting events in the full range
        """

        return f"({self.variable} > {self.lower}) && ({self.variable} < {self.upper})"

    def sideband_cut(self):
        """Generate a cut string for the sideband regions

        Returns:
            str: Cut string in ROOT format selecting events in both lower and upper sidebands
        """

        return " || ".join(
            f"(({self.variable} > {lower}) && ({self.variable} < {upper}))"
            for (lower, upper) in self.sidebands
        )

    def signal_cut(self):
        """Generate a cut string for the signal region

        Returns:
            str: Cut string in ROOT format selecting events in the signal region
        """

        return f"({self.variable} > {self.signal[0]}) && ({self.variable} < {self.signal[1]})"

lower = None instance-attribute

sidebands = sidebands instance-attribute

signal = signal instance-attribute

upper = None instance-attribute

variable = variable instance-attribute

__init__(variable, signal, sidebands)

Initialize a new Sideband instance

Parameters:

Name Type Description Default
variable str

Name of the variable used for sideband subtraction

required
signal Annotated[List[float], 2]

Range in which to subtract background

required
sidebands List[Annotated[List[float], 2]]

Range defining the sideband regions [lower, upper]

required
Source code in src/triggercalib/objects/sideband.py
def __init__(
    self,
    variable: str,
    signal: Annotated[List[float], 2],
    sidebands: List[Annotated[List[float], 2]],
):
    """Initialize a new Sideband instance

    Args:
        variable: Name of the variable used for sideband subtraction
        signal: Range in which to subtract background
        sidebands: Range defining the sideband regions [lower, upper]
    """

    self.variable = variable
    self.signal = signal
    self.sidebands = sidebands

    # Find minimum and maximum bounds for the variable, and check for non-overlapping windows
    self.lower = None
    self.upper = None
    for lower, upper in self.sidebands + [self.signal]:
        for _sideband in self.sidebands:
            if lower > _sideband[0] and upper < _sideband[1]:
                raise ValueError(
                    "Sideband and signal windows should be defined such that they do not overlap"
                )
        if self.lower is None or lower < self.lower:
            self.lower = lower
        if self.upper is None or upper > self.upper:
            self.upper = upper

    return

range_cut()

Generate a cut string for the full variable range

Returns:

Name Type Description
str

Cut string in ROOT format selecting events in the full range

Source code in src/triggercalib/objects/sideband.py
def range_cut(self):
    """Generate a cut string for the full variable range

    Returns:
        str: Cut string in ROOT format selecting events in the full range
    """

    return f"({self.variable} > {self.lower}) && ({self.variable} < {self.upper})"

scale(width=None)

Calculate the scaling factor for sideband subtraction

Parameters:

Name Type Description Default
width

Optional width to use instead of signal region width

None

Returns:

Name Type Description
float

Scaling factor for sideband subtraction, calculated as signal_width / (lower_sideband_width + upper_sideband_width)

Source code in src/triggercalib/objects/sideband.py
def scale(self, width=None):
    """Calculate the scaling factor for sideband subtraction

    Args:
        width: Optional width to use instead of signal region width

    Returns:
        float: Scaling factor for sideband subtraction, calculated as
              signal_width / (lower_sideband_width + upper_sideband_width)
    """
    if not (width):
        width = np.diff(self.signal)
    return width / sum(np.diff(self.sidebands))

sideband_cut()

Generate a cut string for the sideband regions

Returns:

Name Type Description
str

Cut string in ROOT format selecting events in both lower and upper sidebands

Source code in src/triggercalib/objects/sideband.py
def sideband_cut(self):
    """Generate a cut string for the sideband regions

    Returns:
        str: Cut string in ROOT format selecting events in both lower and upper sidebands
    """

    return " || ".join(
        f"(({self.variable} > {lower}) && ({self.variable} < {upper}))"
        for (lower, upper) in self.sidebands
    )

signal_cut()

Generate a cut string for the signal region

Returns:

Name Type Description
str

Cut string in ROOT format selecting events in the signal region

Source code in src/triggercalib/objects/sideband.py
def signal_cut(self):
    """Generate a cut string for the signal region

    Returns:
        str: Cut string in ROOT format selecting events in the signal region
    """

    return f"({self.variable} > {self.signal[0]}) && ({self.variable} < {self.signal[1]})"