Questions or feedback?

Source code for opendp.extras.sklearn.decomposition

'''
This module requires extra installs: ``pip install 'opendp[scikit-learn]'``

For convenience, all the functions of this module are also available from :py:mod:`opendp.prelude`.
We suggest importing under the conventional name ``dp``:

.. code:: python

    >>> import opendp.prelude as dp

The methods of this module will then be accessible at ``dp.sklearn.decomposition``.    

See also our :ref:`tutorial on diffentially private PCA <dp-pca>`.
'''

from __future__ import annotations
from typing import NamedTuple, Optional, TYPE_CHECKING, Sequence
from opendp.extras.numpy import then_np_clamp
from opendp.extras._utilities import register_measurement, to_then
from opendp.extras.numpy._make_np_mean import make_private_np_mean
from opendp.extras.sklearn._make_eigendecomposition import then_private_np_eigendecomposition
from opendp.mod import Domain, Measurement, Metric
from opendp._lib import import_optional_dependency
from opendp._internal import _make_measurement, _make_transformation

if TYPE_CHECKING: # pragma: no cover
    import numpy # type: ignore[import-not-found]


[docs] class PCAEpsilons(NamedTuple): ''' Tuple used to describe the ε-expenditure per changed record in the input data ''' eigvals: float eigvecs: Sequence[float] mean: Optional[float]
[docs] def make_private_pca( input_domain: Domain, input_metric: Metric, unit_epsilon: float | PCAEpsilons, norm: float | None = None, num_components=None, ) -> Measurement: """Construct a Measurement that returns the data mean, singular values and right singular vectors. :param input_domain: instance of `array2_domain(size=_, num_columns=_)` :param input_metric: instance of `symmetric_distance()` :param unit_epsilon: ε-expenditure per changed record in the input data :param norm: clamp each row to this norm bound :param num_components: optional, number of eigenvectors to release. defaults to num_columns from input_domain :return: a Measurement that computes a tuple of (mean, S, Vt) """ import opendp.prelude as dp np = import_optional_dependency('numpy') dp.assert_features("contrib", "floating-point") class PCAResult(NamedTuple): mean: numpy.ndarray S: numpy.ndarray Vt: numpy.ndarray input_desc = input_domain.descriptor if input_desc.size is None: raise ValueError("input_domain's size must be known") # pragma: no cover if input_desc.num_columns is None: raise ValueError("input_domain's num_columns must be known") # pragma: no cover if input_desc.p not in {None, 2}: raise ValueError("input_domain's norm must be an L2 norm") # pragma: no cover if input_desc.num_columns < 1: raise ValueError("input_domain's num_columns must be >= 1") # pragma: no cover num_components = ( input_desc.num_columns if num_components is None else num_components ) if isinstance(unit_epsilon, float): num_eigvec_releases = min(num_components, input_desc.num_columns - 1) unit_epsilon = _split_pca_epsilon_evenly( unit_epsilon, num_eigvec_releases, estimate_mean=input_desc.origin is None ) if not isinstance(unit_epsilon, PCAEpsilons): raise ValueError("epsilon must be a float or instance of PCAEpsilons") # pragma: no cover eigvals_epsilon, eigvecs_epsilons, mean_epsilon = unit_epsilon def _eig_to_SVt(decomp): eigvals, eigvecs = decomp return np.sqrt(np.maximum(eigvals, 0))[::-1], eigvecs.T def _make_eigdecomp(norm, origin): return ( (input_domain, input_metric) >> then_np_clamp(norm, p=2, origin=origin) >> then_center() >> then_private_np_eigendecomposition(eigvals_epsilon, eigvecs_epsilons) >> (lambda out: PCAResult(origin, *_eig_to_SVt(out))) ) if input_desc.norm is not None: if mean_epsilon is not None: raise ValueError("mean_epsilon should be zero because origin is known") # pragma: no cover norm = input_desc.norm if norm is None else norm norm = min(input_desc.norm, norm) return _make_eigdecomp(norm, input_desc.origin) elif norm is None: raise ValueError("must have either bounded `input_domain` or specify `norm`") # pragma: no cover # make releases under the assumption that d_in is 2. unit_d_in = 2 compositor = dp.c.make_sequential_composition( input_domain, input_metric, dp.max_divergence(), d_in=unit_d_in, d_mids=[mean_epsilon, _make_eigdecomp(norm, 0).map(unit_d_in)], ) def _function(data): nonlocal input_domain qbl = compositor(data) # find origin m_mean = dp.binary_search_chain( # type: ignore[misc] lambda s: make_private_np_mean( input_domain, input_metric, s, norm=norm, p=1 ), d_in=unit_d_in, d_out=mean_epsilon, T=float, ) origin = qbl(m_mean) # make full release return qbl(_make_eigdecomp(norm, origin)) return _make_measurement( input_domain, input_metric, compositor.output_measure, _function, compositor.map, )
# generate then variant of the constructor then_private_pca = register_measurement(make_private_pca)
[docs] class PCA(): ''' DP wrapper for `sklearn's PCA <https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html>`_. Trying to create an instance without sklearn installed will raise an ``ImportError``. See the :ref:`tutorial on diffentially private PCA <dp-pca>` for details. :param whiten: Mirrors the corresponding sklearn parameter: When ``True`` (``False`` by default) the ``components_`` vectors are multiplied by the square root of n_samples and then divided by the singular values to ensure uncorrelated outputs with unit component-wise variances. ''' def __init__( self, *, epsilon: float, row_norm: float, n_samples: int, n_features: int, n_components: int | float | str | None = None, n_changes: int = 1, whiten: bool = False, ) -> None: # Error if constructor called without dependency: import_optional_dependency('sklearn.decomposition') # The zero-argument form of super() does not work, # (I believe) because the type argument is determined lexically, # and it doesn't see our redefinition. # Instead, make the type explicit. # # Also because of the class redefinition, mypy has a lot of complaints, # and so there's a lot of "type: ignore" below. super(PCA, self).__init__( # type: ignore[call-arg] n_components or n_features, whiten=whiten, ) self.epsilon = epsilon self.row_norm = row_norm self.n_samples = n_samples self.n_features_in_ = n_features self.n_changes = n_changes @property def n_features(self): ''' Number of features ''' return self.n_features_in_ # This isn't strictly necessary, since we just call the superclass method, # but this lets us document a frequently used method, # and avoids a number of mypy warnings.
[docs] def fit(self, X, y=None): ''' Fit the model with X. :param X: Training data, where ``n_samples`` is the number of samples and ``n_features`` is the number of features. :param y: Ignored ''' return super(PCA, self).fit(X) # type: ignore[misc]
# this overrides the scikit-learn method to instead use the opendp-core constructor def _fit(self, X): return self._prepare_fitter()(X) def _prepare_fitter(self) -> Measurement: """Returns a measurement that computes the mean and eigendecomposition, and then apply those releases to self.""" import opendp.prelude as dp if hasattr(self, "components_"): raise ValueError("DP-PCA model has already been fitted") # pragma: no cover input_domain = dp.numpy.array2_domain( num_columns=self.n_features_in_, size=self.n_samples, T=float ) input_metric = dp.symmetric_distance() n_estimated_components = ( self.n_components # type: ignore[attr-defined] if isinstance(self.n_components, int) # type: ignore[attr-defined] else self.n_features_in_ # type: ignore[attr-defined] ) return make_private_pca( input_domain, input_metric, self.epsilon / self.n_changes * 2, norm=self.row_norm, num_components=n_estimated_components, ) >> self._postprocess def _postprocess(self, values): """A function that applies a release of the mean and eigendecomposition to self""" np = import_optional_dependency('numpy') from sklearn.utils.extmath import stable_cumsum, svd_flip # type: ignore[import] from sklearn.decomposition._pca import _infer_dimension # type: ignore[import] self.mean_, S, Vt = values U = Vt.T n_samples, n_features = self.n_samples, self.n_features_in_ n_components = self.n_components # type: ignore[attr-defined] # CODE BELOW THIS POINT IS FROM SKLEARN # flip eigenvectors' sign to enforce deterministic output U, Vt = svd_flip(U, Vt) components_ = Vt # Get variance explained by singular values explained_variance_ = (S**2) / (n_samples - 1) total_var = np.sum(explained_variance_) explained_variance_ratio_ = explained_variance_ / total_var singular_values_ = S # Store the singular values. # Postprocess the number of components required if n_components == "mle": n_components = _infer_dimension(explained_variance_, n_samples) elif 0 < n_components < 1.0: # number of components for which the cumulated explained # variance percentage is superior to the desired threshold # side='right' ensures that number of features selected # their variance is always greater than n_components float # passed. More discussion in issue: https://github.com/scikit-learn/scikit-learn/pull/15669 explained_variance_ratio_np = explained_variance_ratio_ ratio_cumsum = stable_cumsum(explained_variance_ratio_np) n_components = np.searchsorted(ratio_cumsum, n_components, side="right") + 1 # Compute noise covariance using Probabilistic PCA model # The sigma2 maximum likelihood (cf. eq. 12.46) if n_components < min(n_features, n_samples): self.noise_variance_ = np.mean(explained_variance_[n_components:]) else: self.noise_variance_ = 0.0 self.n_samples_ = n_samples self.components_ = components_[:n_components, :] self.n_components_ = n_components self.explained_variance_ = explained_variance_[:n_components] self.explained_variance_ratio_ = explained_variance_ratio_[:n_components] self.singular_values_ = singular_values_[:n_components] return U, S, Vt
[docs] def measurement(self) -> Measurement: """Return a measurement that releases a fitted model.""" return self._prepare_fitter() >> (lambda _: self)
# overrides an sklearn method def _validate_params(*args, **kwargs): pass
_decomposition = import_optional_dependency('sklearn.decomposition', False) if _decomposition is not None: PCA = type('PCA', (_decomposition.PCA,), dict(PCA.__dict__)) # type: ignore[assignment,misc] def _smaller(v): """returns the next non-negative float closer to zero""" np = import_optional_dependency('numpy') if v < 0: raise ValueError("expected non-negative value") # pragma: no cover return v if v == 0 else np.nextafter(v, -1) def _split_pca_epsilon_evenly(unit_epsilon, num_eigvec_releases, estimate_mean=False): num_queries = 3 if estimate_mean else 2 per_query_epsilon = unit_epsilon / num_queries per_evec_epsilon = per_query_epsilon / num_eigvec_releases # use conservatively smaller budgets to prevent totals from exceeding total epsilon return PCAEpsilons( eigvals=per_query_epsilon, eigvecs=[_smaller(per_evec_epsilon)] * num_eigvec_releases, mean=_smaller(per_query_epsilon) if estimate_mean else None, ) def _make_center(input_domain, input_metric): import opendp.prelude as dp np = import_optional_dependency('numpy') dp.assert_features("contrib", "floating-point") input_desc = input_domain.descriptor kwargs = input_desc._asdict() | {"origin": np.zeros(input_desc.num_columns)} return _make_transformation( input_domain, input_metric, dp.numpy.array2_domain(**kwargs), input_metric, lambda arg: arg - input_desc.origin, lambda d_in: d_in, ) then_center = to_then(_make_center)