Questions or feedback?

Source code for opendp.extras.mbi._table

"""High-level mechanism for applying mbi mechanisms to dataframes with mixed types."""

from math import sqrt
from dataclasses import dataclass, field
from typing import (
    Any,
    Iterator,
    Literal,
    Mapping,
    Optional,
    Sequence,
    TypeAlias,
    Union,
    cast,
    TYPE_CHECKING,
)
from opendp._internal import _new_pure_function
from opendp._lib import import_optional_dependency
from opendp.combinators import (
    make_adaptive_composition,
    make_approximate,
    make_composition,
)
from opendp.extras.mbi._aim import AIM
from opendp.extras.mbi._utilities import (
    prior,
    get_std,
    row_major_order,
    weight_marginals,
    Algorithm,
    ONEWAY_UNKEYED,
)
from opendp.measurements import make_private_lazyframe
from opendp.mod import (
    ApproximateDivergence,
    FrameDistance,
    LazyFrameDomain,
    Measure,
    Measurement,
    OpenDPException,
    Queryable,
    binary_search,
    binary_search_param,
)
from opendp.domains import _lazyframe_from_domain
from opendp.transformations import make_stable_lazyframe
from opendp.typing import RuntimeType


if TYPE_CHECKING:  # pragma: no cover
    from opendp.extras.polars import Bound


[docs] @dataclass class ContingencyTable: """Approximate a k-way contingency table via low-order marginals.""" keys: dict[str, Any] # dict[str, Series] """Unique keys for each column""" cuts: dict[str, Any] # dict[str, Series] """Cut points (left-open) for each numeric column""" marginals: dict[tuple[str, ...], Any] # dict[tuple[str, ...], LinearMeasurement] """Mapping from clique name to LinearMeasurement used to fit the model""" model: Any # MarkovRandomField """MarkovRandomField spanning the same columns as keys""" thresholds: dict[str, int] = field(default_factory=dict) """Cut-off point for discovered stable keys. Any category appearing fewer than threshold times is attributed to the null category.""" def __post_init__(self): from mbi import MarkovRandomField, Domain # type: ignore[import-untyped,import-not-found] import polars as pl # type: ignore[import-not-found] if not isinstance(self.model, MarkovRandomField): # pragma: no cover raise ValueError("model must be a MarkovRandomField") for col in self.keys: self.keys[col] = pl.Series(col, self.keys[col]) for col in self.cuts: self.cuts[col] = pl.Series(col, self.cuts[col]) key_shape = set((c, len(k)) for c, k in self.keys.items()) if set(self.model.domain.config.items()) != key_shape: raise ValueError( "Model domain must match key attrs and sizes.\n" f" Model shape: {self.model.domain}\n" f" Key shape: {Domain.fromdict(dict(key_shape))}" ) for col, cutset in self.cuts.items(): if col not in self.keys: raise ValueError(f'"{col}" in cuts is not present in keys') if len(self.keys[col]) - 1 != len(cutset): msg = f'"{col}" keyset length ({len(self.keys[col])}) must be one greater than the number of cuts ({len(cutset)})' raise ValueError(msg)
[docs] def synthesize( self, rows: Optional[int] = None, method: Literal["round", "sample"] = "round" ): """Generate synthetic data that is consistent with the contingency table. :param rows: if not set, approximately matches number of rows in original data :param method: "round" rounds to projection, "sample" samples from densities """ import polars as pl # type: ignore[import-not-found] from mbi import MarkovRandomField # type: ignore[import-untyped,import-not-found] model = cast(MarkovRandomField, self.model) indices = model.synthetic_data(rows, method).df data = { c: _deindex(indices[c].to_numpy(), self.keys[c], cuts=self.cuts.get(c)) for c in self.keys } return pl.DataFrame(data)
[docs] def project(self, attrs: str | Sequence[str]): """Return counts corresponding to each combination of self.keys in `attrs` in an ndarray. :param attrs: attributes to preserve. All other attributes are marginalized. """ from mbi import MarkovRandomField # type: ignore[import-untyped,import-not-found] model = cast(MarkovRandomField, self.model) return model.project(attrs).values
[docs] def project_melted(self, attrs: str | Sequence[str]): """Return counts corresponding to each combination of self.keys in `attrs` in a dataframe. :param attrs: attributes to preserve. All other attributes are marginalized. """ import polars as pl import numpy as np attrs = [attrs] if isinstance(attrs, str) else list(attrs) lengths_arr = np.asarray(self.project(attrs).ravel()) lengths = pl.Series("len", lengths_arr).to_frame() return row_major_order(self.keys[attr] for attr in attrs).hstack(lengths)
[docs] def std(self, attrs: Sequence[str]) -> float: """Estimate the standard deviation of the count estimate. The estimate does not take cuts into account. When marginals are estimated with gaussian noise, the consistent (variance-weighted) marginals are also gaussian-distributed. Therefore the standard deviation matches the noise scale parameter. When marginals are estimated with laplace noise, the consistent marginals are a sum of laplacians, which is not laplace-distributed. Therefore the standard deviation doesn't correspond to the laplace distribution. Under the central limit theorem, the distribution will tend towards gaussian. If the estimate is gaussian-distributed, use :py:func:`opendp.accuracy.gaussian_scale_to_accuracy` to construct a confidence interval. :param attrs: attributes to preserve in uncertainty estimate """ from mbi import MarkovRandomField # type: ignore[import-untyped,import-not-found] model = cast(MarkovRandomField, self.model) if isinstance(attrs, str): # pragma: no cover raise ValueError(f"attrs ({attrs}) must be a sequence of strings") attrs_clique = set(attrs) size = model.domain.size inv_var_sum = sum( 1 / (lm.stddev**2 * size(clique) / size(attrs_clique)) for clique, lm in self.marginals.items() if attrs_clique.issuperset(clique) ) if inv_var_sum == 0: raise ValueError(f"attrs ({attrs}) are not covered by the query set") return sqrt(1 / inv_var_sum)
@property def schema(self) -> dict[str, Any]: """Returns the data schema.""" return {keyset.name: keyset.dtype for keyset in self.keys.values()}
[docs] def make_contingency_table( input_domain: LazyFrameDomain, input_metric: FrameDistance, output_measure: Measure, d_in: list["Bound"], d_out: Union[float, tuple[float, float]], keys: Optional[Mapping[str, Sequence]] = None, cuts: Optional[Mapping[str, Sequence[float]]] = None, table: Optional[ContingencyTable] = None, algorithm: Algorithm = AIM(), ) -> tuple[Measurement, Optional[float], Optional[int]]: """Return a measurement that releases a :class:`.ContingencyTable`, as well as the noise scale and optional threshold for one-way marginals. :param input_domain: domain of input data :param input_metric: how to compute distance between datasets :param output_measure: how to measure privacy of release :param d_in: bounds on distances between adjacent input datasets :param d_out: upper bound on the privacy loss :param keys: dictionary of column names and unique categories :param cuts: dictionary of column names and left-open bin edges :param table: ContingencyTable from prior release :param algorithm: configuration for internal estimation algorithm """ import_optional_dependency("mbi") import polars as pl import mbi plan = _lazyframe_from_domain(input_domain) schema = plan.collect_schema() keys_pl: dict[str, pl.Series] = { col: _unique(_with_null(pl.Series(col, keyset, dtype=schema.get(col))), "keys") for col, keyset in (keys or dict()).items() } cuts_pl: dict[str, pl.Series] = { col: _unique(_increasing(pl.Series(col, cutset, dtype=schema.get(col))), "cuts") for col, cutset in (cuts or dict()).items() } # add cut bin labels to keys def get_categories(cutset): labels = [f"({lb}, {rb}]" for lb, rb in zip(["-inf", *cutset], [*cutset, "inf"])] return pl.Series(cutset.name, labels) keys_pl |= {col: get_categories(cutset) for col, cutset in cuts_pl.items()} if table: keys_pl |= table.keys cuts_pl |= table.cuts model = cast(mbi.MarkovRandomField, table.model) marginals = table.marginals.copy() thresholds = table.thresholds.copy() else: model = None marginals = {} thresholds = {} if cuts_pl: plan = plan.with_columns(pl.col(c).cut(cutset, labels=get_categories(cutset)) for c, cutset in cuts_pl.items() if c in schema) # type: ignore[arg-type] if (QO := RuntimeType.infer(d_out)) != output_measure.distance_type: raise ValueError(f"d_out type ({QO}) must be {output_measure.distance_type}") delta = None if isinstance(d_out, tuple): d_out, delta = d_out all_keys_known = set(input_domain.columns) <= set(keys_pl) if delta and all_keys_known: message = f"delta ({delta}) must be zero because keys and cuts span all columns" raise ValueError(message) if not delta and not all_keys_known: message = f"delta ({delta}) must be nonzero because keys and cuts don't span all columns" raise ValueError(message) if algorithm.oneway == ONEWAY_UNKEYED and all_keys_known: d_multiway: Union[float | tuple[float, float]] = d_out d_mids = [d_multiway if delta is None else (d_multiway, 0.0)] m_oneway = None scale, threshold = None, None else: if algorithm.oneway_split is None: oneway_split = 0.5 if algorithm.oneway == ONEWAY_UNKEYED: columns = set(input_domain.columns) oneway_split *= len(columns - set(keys_pl)) / len(columns) else: oneway_split = algorithm.oneway_split d_oneway = d_out * oneway_split d_multiway = float(prior(prior(d_out - d_oneway))) if delta is not None: d_oneway = d_oneway, delta # type: ignore[assignment] d_mids = [d_oneway, d_multiway if delta is None else (d_multiway, 0.0)] # type: ignore[has-type] m_oneway, scale, threshold = _make_oneway_marginals( input_domain, input_metric, output_measure, d_in=d_in, d_out=d_oneway, keys=keys_pl, plan=plan, unknown_only=algorithm.oneway == ONEWAY_UNKEYED, ) if threshold is not None: thresholds |= { c: threshold for c in input_domain.columns if c not in keys_pl } m_comp = make_adaptive_composition( input_domain, input_metric, output_measure, d_in, d_mids ) if isinstance(output_measure, ApproximateDivergence): inner_measure = output_measure.inner_measure else: inner_measure = output_measure if "len" in input_domain.columns: raise ValueError('input_domain must not contain a column named "len"') def postprocess(qbl: Queryable) -> ContingencyTable: stable_keys = keys_pl.copy() current_marginals = marginals.copy() if m_oneway: new_keys, oneway_releases = qbl(m_oneway) stable_keys |= new_keys current_marginals = weight_marginals(current_marginals, *oneway_releases) mbi_domain = mbi.Domain.fromdict({c: len(k) for c, k in stable_keys.items()}) potentials = None if isinstance(model, mbi.MarkovRandomField): import attr # type: ignore[import-not-found] potential_factor = attr.evolve(model.potentials, domain=mbi_domain) potentials = potential_factor.expand(list(current_marginals.keys())) current_model = algorithm.estimator( mbi_domain, list(current_marginals.values()), potentials=potentials ) t_index = make_stable_lazyframe( input_domain, input_metric, plan.with_columns( pl.col(c) .replace_strict( stable_keys[c], pl.Series(range(len(stable_keys[c])), dtype=pl.UInt32), default=_get_null_index(stable_keys[c], len(stable_keys[c]) - 1), ) .clip(0, len(stable_keys[c]) - 1) for c in input_domain.columns ), ) m_marginals = algorithm.make_marginals( input_domain=t_index.output_domain, input_metric=t_index.output_metric, output_measure=inner_measure, d_in=t_index.map(d_in), d_out=d_multiway, # type: ignore[arg-type] marginals=current_marginals, model=current_model, ) m_index_marginals = t_index >> m_marginals if delta is not None: m_index_marginals = make_approximate(m_index_marginals) T: TypeAlias = tuple[dict[tuple[str, ...], Any], mbi.MarkovRandomField] current_marginals, current_model = cast(T, qbl(m_index_marginals)) ordered_keys = {c: stable_keys[c] for c in input_domain.columns} ordered_keys |= stable_keys return ContingencyTable( keys=ordered_keys, cuts=cuts_pl, marginals=current_marginals, model=current_model, thresholds=thresholds, ) return m_comp >> _new_pure_function(postprocess), scale, threshold
def _unique(series, arg: str): """raise error if elements are not unique""" if series.n_unique() != series.len(): # pragma: no cover # this is actually tested, but coverage doesn't see it raise ValueError(f'{arg} must be unique: "{series.name}" has duplicates') return series def _increasing(series): """raise error if elements are not increasing""" import polars as pl diff = series.cast(pl.Float64).diff(null_behavior="drop") if diff.le(0).any(): # pragma: no cover # this is actually tested, but coverage doesn't see it message = f'cuts must be strictly increasing: "{series.name}" is not strictly increasing' raise ValueError(message) return series def _with_null(series): """create new series leading with null if null is not present""" return ( series if series.is_null().any() else series.clone().append(series.clear(n=1)) ) def _get_null_index(series, default=None) -> Optional[int]: """returns the index of null in a polars `series`, else `default`""" null_indices = series.is_null().arg_true() return null_indices.item() if null_indices.len() else default def _make_oneway_marginals( input_domain: LazyFrameDomain, input_metric: FrameDistance, output_measure: Measure, d_in, d_out, keys: dict[str, Any], plan: Any, unknown_only: bool, ) -> tuple[Measurement, float, Optional[int]]: """Returns a measurement that releases keys and counts for one-way marginals, as well as the discovered scale and threshold.""" from opendp.extras.polars import dp_len import polars as pl # type: ignore[import-not-found] from mbi import LinearMeasurement # type: ignore[import-not-found] from mbi.estimation import minimum_variance_unbiased_total # type: ignore[import-untyped,import-not-found] import numpy as np # type: ignore[import-not-found] def group_by_agg(plan: pl.LazyFrame, name: str) -> pl.LazyFrame: """Aggregate the compute plan""" keyset = keys.get(name) if keyset is None: return plan.group_by(name).agg(dp_len()) # fold all unknown values into null if isinstance(keyset.dtype, pl.Categorical): # pragma: no cover keyset = keyset.cast(pl.Enum(keyset.cast(pl.String))) replace = pl.col(name).replace_strict(keys[name], keyset, default=None) return pl.LazyFrame([keyset]).join( plan.select(replace).group_by(name).agg(dp_len()), on=[name], how="left", nulls_equal=True, ) def _make(scale: float, threshold: Optional[int] = None) -> Measurement: std = get_std(output_measure, scale) # all columns that should be estimated names = [ name for name in input_domain.columns if name not in keys or not unknown_only ] # estimating the total is necessary if there are no keyed columns should_total = all(name not in keys for name in input_domain.columns) one_way_measurements = [ make_private_lazyframe( input_domain, input_metric, output_measure, group_by_agg(plan, name), global_scale=scale, threshold=threshold, ) >> _new_pure_function(lambda x: x.collect()) for name in names ] if should_total: m_total = make_private_lazyframe( input_domain, input_metric, output_measure, plan.select(dp_len()), global_scale=scale, ) >> _new_pure_function(lambda x: x.collect()) one_way_measurements.append(m_total) def postprocess(counts: list[pl.DataFrame]): new_keys = dict() lm_counts = [] if should_total: total = counts.pop().item() else: total = minimum_variance_unbiased_total( LinearMeasurement(count["len"].to_numpy(), [name], stddev=std) for name, count in zip(names, counts) if name in keys ) for name, count in zip(names, counts): count = count.sort(name) lookup = dict(zip(count[name], count["len"])) if name in keys: values_iter: Iterator[int] = (lookup[k] for k in keys[name]) else: lookup.setdefault(None, 0) lookup[None] += max(total - count["len"].sum(), 0) dtype = count[name].dtype new_keys[name] = pl.Series(name, lookup.keys(), dtype=dtype) values_iter = lookup.values() # type: ignore[assignment] values = np.fromiter(values_iter, dtype=np.uint32) lm_counts.append(LinearMeasurement(values, clique=[name], stddev=std)) return new_keys, lm_counts return make_composition(one_way_measurements) >> _new_pure_function(postprocess) if isinstance(output_measure, ApproximateDivergence): compare_s = lambda s: _make(s, threshold=2**32 - 1).map(d_in)[0] < d_out[0] # type: ignore[index] scale = binary_search(compare_s, T=float) try: _make(scale, threshold=None) threshold: Optional[int] = None # pragma: no cover except OpenDPException: compare_t = lambda t: _make(scale, t).map(d_in)[1] < d_out[1] # type: ignore[index,arg-type] threshold = binary_search(compare_t, T=int) # type: ignore[assignment] else: scale = binary_search_param(_make, d_in, d_out, T=float) threshold = None return _make(scale, threshold), scale, threshold def _deindex(indices, keys, cuts=None): """Translate keyset indices into categorical data.""" import numpy as np if cuts is None: return keys[indices] if cuts.dtype.is_integer(): # cuts are left-open sampler = lambda lower, upper: np.random.randint(lower + 1, upper + 1) elif cuts.dtype.is_float(): sampler = np.random.uniform # type: ignore[assignment] edges = np.array([cuts[0] - 1, *cuts, cuts[-1] + 1]) return sampler(edges[indices], edges[indices + 1])