'''
The ``context`` module provides :py:class:`opendp.context.Context` and supporting utilities.
'''
from typing import Any, Callable, List, Optional, Tuple, Union
import importlib
from inspect import signature
from functools import partial
from opendp.combinators import (
make_fix_delta,
make_pureDP_to_fixed_approxDP,
make_pureDP_to_zCDP,
make_sequential_composition,
make_zCDP_to_approxDP,
)
from opendp.domains import atom_domain
from opendp.measurements import make_base_laplace, make_gaussian
from opendp.measures import (
fixed_smoothed_max_divergence,
max_divergence,
zero_concentrated_divergence,
)
from opendp.metrics import (
absolute_distance,
change_one_distance,
hamming_distance,
insert_delete_distance,
l1_distance,
l2_distance,
symmetric_distance,
)
from opendp.mod import (
Domain,
Measurement,
Metric,
Queryable,
Transformation,
Measure,
binary_search,
binary_search_param,
)
from opendp.typing import RuntimeType
__all__ = [
'space_of',
'domain_of',
'metric_of',
'loss_of',
'unit_of',
'Context',
'Query',
'Chain',
'PartialChain'
]
# a dictionary of "constructor name" -> (constructor_function, is_partial)
# "constructor name" is the name of the constructor without the "make_" prefix
# constructor_function is the partial version if is_partial is True
constructors = {}
for module_name in ["transformations", "measurements"]:
module = importlib.import_module(f"opendp.{module_name}")
for name in module.__all__:
if not name.startswith("make_"):
continue
partial_name = "then_" + name[5:]
make_func = getattr(module, name)
is_partial = partial_name in module.__all__
constructor = getattr(module, partial_name if is_partial else name)
constructors[name[5:]] = constructor, is_partial
[docs]
def space_of(T, M=None, infer=False) -> Tuple[Domain, Metric]:
"""A shorthand for building a metric space.
A metric space consists of a domain and a metric.
:example:
>>> import opendp.prelude as dp
>>> from typing import List # in Python 3.9, can just write list[int] below
...
>>> dp.space_of(List[int])
(VectorDomain(AtomDomain(T=i32)), SymmetricDistance())
>>> # the verbose form allows greater control:
>>> (dp.vector_domain(dp.atom_domain(T=dp.i32)), dp.symmetric_distance())
(VectorDomain(AtomDomain(T=i32)), SymmetricDistance())
:param T: carrier type (the type of members in the domain)
:param M: metric type
:param infer: if True, `T` is an example of the sensitive dataset. Passing sensitive data may result in a privacy violation.
"""
import opendp.typing as ty
domain = domain_of(T, infer=infer)
D = domain.type
# choose a metric type if not set
if M is None:
if D.origin == "VectorDomain": # type: ignore[union-attr]
M = ty.SymmetricDistance
elif D.origin == "AtomDomain" and ty.get_atom(D) in ty.NUMERIC_TYPES: # type: ignore[union-attr]
M = ty.AbsoluteDistance
else:
raise TypeError(f"no default metric for domain {D}. Please set `M`")
# choose a distance type if not set
if isinstance(M, ty.RuntimeType) and not M.args:
M = M[ty.get_atom(D)] # type: ignore[index]
return domain, metric_of(M)
[docs]
def domain_of(T, infer=False) -> Domain:
"""Constructs an instance of a domain from carrier type `T`.
:param T: carrier type
:param infer: if True, `T` is an example of the sensitive dataset. Passing sensitive data may result in a privacy violation.
"""
import opendp.typing as ty
from opendp.domains import vector_domain, atom_domain, option_domain, map_domain
# normalize to a type descriptor
if infer:
T = ty.RuntimeType.infer(T)
else:
T = ty.RuntimeType.parse(T)
# construct the domain
if isinstance(T, ty.RuntimeType):
if T.origin == "Vec":
return vector_domain(domain_of(T.args[0]))
if T.origin == "HashMap":
return map_domain(domain_of(T.args[0]), domain_of(T.args[1]))
if T.origin == "Option":
return option_domain(domain_of(T.args[0]))
if T in ty.PRIMITIVE_TYPES:
return atom_domain(T=T)
raise TypeError(f"unrecognized carrier type: {T}")
[docs]
def metric_of(M) -> Metric:
"""Constructs an instance of a metric from metric type `M`."""
import opendp.typing as ty
import opendp.metrics as metrics
if isinstance(M, Metric):
return M
M = ty.RuntimeType.parse(M)
if isinstance(M, ty.RuntimeType):
if M.origin == "AbsoluteDistance":
return metrics.absolute_distance(T=M.args[0])
if M.origin == "L1Distance":
return metrics.l1_distance(T=M.args[0])
if M.origin == "L2Distance": # pragma: no cover
return metrics.l2_distance(T=M.args[0])
if M == ty.HammingDistance:
return metrics.hamming_distance() # pragma: no cover
if M == ty.SymmetricDistance:
return metrics.symmetric_distance()
if M == ty.InsertDeleteDistance:
return metrics.insert_delete_distance() # pragma: no cover
if M == ty.ChangeOneDistance:
return metrics.change_one_distance() # pragma: no cover
if M == ty.DiscreteDistance:
return metrics.discrete_distance()
raise TypeError(f"unrecognized metric: {M}")
[docs]
def loss_of(*, epsilon=None, delta=None, rho=None, U=None) -> Tuple[Measure, float]:
"""Constructs a privacy loss, consisting of a privacy measure and a privacy loss parameter.
:param U: The type of the privacy parameter.
>>> from opendp.context import loss_of
>>> measure, distance = loss_of(epsilon=1.0)
>>> measure, distance = loss_of(epsilon=1.0, delta=1e-9)
>>> measure, distance = loss_of(rho=1.0)
"""
if epsilon is None and rho is None:
raise ValueError("Either epsilon or rho must be specified.")
if rho:
U = RuntimeType.parse_or_infer(U, rho)
return zero_concentrated_divergence(T=U), rho
if delta is None:
U = RuntimeType.parse_or_infer(U, epsilon)
return max_divergence(T=U), epsilon
else:
U = RuntimeType.parse_or_infer(U, epsilon)
return fixed_smoothed_max_divergence(T=U), (epsilon, delta) # type: ignore[return-value]
[docs]
def unit_of(
*,
contributions=None,
changes=None,
absolute=None,
l1=None,
l2=None,
ordered=False,
U=None,
) -> Tuple[Metric, float]:
"""Constructs a unit of privacy, consisting of a metric and a dataset distance.
:param ordered: Set to true to use InsertDeleteDistance instead of SymmetricDistance, or HammingDistance instead of ChangeOneDistance.
:param U: The type of the dataset distance."""
def _is_distance(p, v):
return p not in ["ordered", "U", "_is_distance"] and v is not None
if sum(1 for p, v in locals().items() if _is_distance(p, v)) != 1:
raise ValueError("Must specify exactly one distance.")
if contributions is not None:
metric = insert_delete_distance() if ordered else symmetric_distance()
return metric, contributions
if changes is not None: # pragma: no cover
metric = hamming_distance() if ordered else change_one_distance()
return metric, changes
if absolute is not None: # pragma: no cover
metric = absolute_distance(T=RuntimeType.parse_or_infer(U, absolute))
return metric, absolute
if l1 is not None:
metric = l1_distance(T=RuntimeType.parse_or_infer(U, l1))
return metric, l1
if l2 is not None: # pragma: no cover
metric = l2_distance(T=RuntimeType.parse_or_infer(U, l2))
return metric, l2
raise Exception('No matching metric found')
[docs]
class Context(object):
"""A Context coordinates queries to an instance of a privacy `accountant`."""
accountant: Measurement # union Odometer once merged
"""The accountant is the measurement used to spawn the queryable.
It contains information about the queryable,
such as the input domain, input metric, and output measure expected of measurement queries sent to the queryable."""
queryable: Queryable
"""The queryable executes the queries and tracks the privacy expenditure."""
def __init__(
self,
accountant: Measurement,
queryable: Queryable,
d_in,
d_mids=None,
d_out=None,
):
"""Initializes the context with the given accountant and queryable.
It is recommended to use the `sequential_composition` constructor instead of this one.
:param d_in: An upper bound on the distance between adjacent datasets.
:param d_mids: A sequence of privacy losses for each query to be sent to the queryable. Used for compositors.
:param d_out: An upper bound on the overall privacy loss. Used for filters."""
self.accountant = accountant
self.queryable = queryable
self.d_in = d_in
self.d_mids = d_mids
self.d_out = d_out
[docs]
@staticmethod
def compositor(
data: Any,
privacy_unit: Tuple[Metric, float],
privacy_loss: Tuple[Measure, Any],
split_evenly_over: Optional[int] = None,
split_by_weights: Optional[List[float]] = None,
domain: Optional[Domain] = None,
) -> "Context":
"""Constructs a new context containing a sequential compositor with the given weights.
If the domain is not specified, it will be inferred from the data.
This makes the assumption that the structure of the data is public information.
The weights may be a list of numerics, corresponding to how `privacy_loss` should be distributed to each query.
Alternatively, pass a single integer to distribute the loss evenly.
:param data: The data to be analyzed.
:param privacy_unit: The privacy unit of the compositor.
:param privacy_loss: The privacy loss of the compositor.
:param weights: How to distribute `privacy_loss` among the queries.
:param domain: The domain of the data."""
if domain is None:
domain = domain_of(data, infer=True)
accountant, d_mids = _sequential_composition_by_weights(
domain, privacy_unit, privacy_loss, split_evenly_over, split_by_weights
)
return Context(
accountant=accountant,
queryable=accountant(data),
d_in=privacy_unit[1],
d_mids=d_mids,
)
def __call__(self, query: Union["Query", Measurement]):
"""Executes the given query on the context."""
if isinstance(query, Query):
query = query.resolve() # pragma: no cover
answer = self.queryable(query)
if self.d_mids is not None:
self.d_mids.pop(0)
return answer
[docs]
def query(self, **kwargs) -> "Query":
"""Starts a new Query to be executed in this context.
If the context has been constructed with a sequence of privacy losses,
the next loss will be used. Otherwise, the loss will be computed from the kwargs.
:param kwargs: The privacy loss to use for the query. Passed directly into `loss_of`.
"""
d_query = None
if self.d_mids is not None:
if kwargs:
raise ValueError(f"Expected no privacy arguments but got {kwargs}")
if not self.d_mids:
raise ValueError("Privacy allowance has been exhausted")
d_query = self.d_mids[0]
elif kwargs: # pragma: no cover
measure, d_query = loss_of(**kwargs)
if measure != self.output_measure: # type: ignore[attr-defined]
raise ValueError(
f"Expected output measure {self.output_measure} but got {measure}" # type: ignore[attr-defined]
)
return Query(
chain=(self.accountant.input_domain, self.accountant.input_metric),
output_measure=self.accountant.output_measure,
d_in=self.d_in,
d_out=d_query,
context=self,
)
Chain = Union[Tuple[Domain, Metric], Transformation, Measurement, "PartialChain"]
[docs]
class Query(object):
"""A helper API to build a measurement."""
_chain: Chain
"""The current chain of transformations and measurements."""
_output_measure: Measure
"""The output measure of the query."""
_context: Optional["Context"]
"""The context that the query is part of. `query.release()` submits `_chain` to `_context`."""
_wrap_release: Optional[Callable[[Any], Any]]
"""For internal use. A function that wraps the release of the query.
Used to wrap the response of compositor/odometer queries in another `Analysis`."""
def __init__(
self,
chain: Chain,
output_measure: Measure = None, # type: ignore[assignment]
d_in=None,
d_out=None,
context: "Context" = None, # type: ignore[assignment]
_wrap_release=None,
) -> None:
"""Initializes the query with the given chain and output measure.
It is more convenient to use the `context.query()` constructor than this one.
However, this can be used stand-alone to help build a transformation/measurement that is not part of a context.
:param chain: an initial metric space (tuple of domain and metric) or transformation
:param output_measure: how privacy will be measured on the output of the query
:param d_in: an upper bound on the distance between adjacent datasets
:param d_out: an upper bound on the overall privacy loss
:param context: if specified, then when the query is released, the chain will be submitted to this context
:param _wrap_release: for internal use only
"""
self._chain = chain
self._output_measure = output_measure
self._d_in = d_in
self._d_out = d_out
self._context = context
self._wrap_release = _wrap_release
def __getattr__(self, name: str) -> Callable[[Any], "Query"]:
"""Creates a new query by applying a transformation or measurement to the current chain."""
if name not in constructors:
raise AttributeError(f"Unrecognized constructor: '{name}'")
def make(*args, **kwargs) -> "Query":
"""Wraps the `make_{name}` constructor to allow one optional parameter and chains it to the current query.
This function will be called when the user calls `query.{name}(...)`.
"""
constructor, is_partial = constructors[name]
# determine how many parameters are missing
param_diff = len(args)
for param in signature(constructor).parameters.values():
if param.name in kwargs:
continue
if param.default is not param.empty:
break
param_diff -= 1
if param_diff == -1 and not isinstance(self._chain, PartialChain):
constructor = PartialChain.wrap(constructor)
elif param_diff < 0:
raise ValueError(f"{name} is missing {-param_diff} parameter(s).")
elif param_diff > 0:
raise ValueError(f"{name} has {param_diff} parameter(s) too many.")
new_chain = constructor(*args, **kwargs)
if is_partial or not isinstance(self._chain, tuple):
new_chain = self._chain >> new_chain
return self.new_with(chain=new_chain)
return make
[docs]
def new_with(self, *, chain: Chain, wrap_release=None) -> "Query":
"""Convenience constructor that creates a new query with a different chain."""
return Query(
chain=chain,
output_measure=self._output_measure,
d_in=self._d_in,
d_out=self._d_out,
context=self._context, # type: ignore[arg-type]
_wrap_release=wrap_release or self._wrap_release,
)
def __dir__(self):
"""Returns the list of available constructors. Used by Python's error suggestion mechanism."""
return super().__dir__() + list(constructors.keys()) # type: ignore[operator] # pragma: no cover
[docs]
def resolve(self, allow_transformations=False):
"""Resolve the query into a measurement."
:param allow_transformations: If true, allow the response to be a transformation instead of a measurement.
"""
# resolve a partial chain into a measurement, by fixing the input and output distances
if isinstance(self._chain, PartialChain):
chain = self._chain.fix(self._d_in, self._d_out, self._output_measure)
else:
chain = self._chain
if not allow_transformations and isinstance(chain, Transformation):
raise ValueError("Query is not yet a measurement")
return _cast_measure(chain, self._output_measure, self._d_out)
[docs]
def release(self) -> Any:
"""Release the query. The query must be part of a context."""
# TODO: consider adding an optional `data` parameter for when _context is None
answer = self._context(self.resolve()) # type: ignore[misc]
if self._wrap_release:
answer = self._wrap_release(answer)
return answer
[docs]
def param(self):
"""Returns the discovered parameter, if there is one"""
return getattr(self.resolve(), "param", None) # pragma: no cover
[docs]
def compositor(
self,
split_evenly_over: Optional[int] = None,
split_by_weights: Optional[List[float]] = None,
d_out=None,
output_measure=None,
) -> "Context":
"""Constructs a new context containing a sequential compositor with the given weights.
:param weights: A list of weights corresponding to the privacy budget allocated to a sequence of queries.
"""
if d_out is not None and self._d_out is not None:
raise ValueError("`d_out` has already been specified in query")
if d_out is None and self._d_out is None:
raise ValueError("`d_out` has not yet been specified in the query")
d_out = d_out or self._d_out
if output_measure is not None:
d_out = _translate_measure_distance(
d_out, self._output_measure, output_measure
)
def compositor(chain: Union[Tuple[Domain, Metric], Transformation], d_in):
if isinstance(chain, tuple):
input_domain, input_metric = chain
elif isinstance(chain, Transformation): # pragma: no cover
input_domain, input_metric = chain.output_domain, chain.output_metric
d_in = chain.map(d_in)
privacy_unit = input_metric, d_in
privacy_loss = output_measure or self._output_measure, d_out
accountant, d_mids = _sequential_composition_by_weights(
input_domain,
privacy_unit,
privacy_loss,
split_evenly_over,
split_by_weights,
)
if isinstance(chain, Transformation):
accountant = chain >> accountant # pragma: no cover
def wrap_release(queryable):
return Context(
accountant=accountant,
queryable=queryable,
d_in=d_in,
d_mids=d_mids,
)
return self.new_with(chain=accountant, wrap_release=wrap_release)
return self._compose_context(compositor)
def _compose_context(self, compositor):
"""Helper function for composition in a context."""
if isinstance(self._chain, PartialChain):
return PartialChain(lambda x: compositor(self._chain(x), self._d_in)) # pragma: no cover
else:
return compositor(self._chain, self._d_in)
[docs]
class PartialChain(object):
"""A partial chain is a transformation or measurement that is missing one numeric parameter.
The parameter can be solved for by calling the fix method,
which returns the closest transformation or measurement that satisfies the given stability or privacy constraint.
"""
partial: Callable[[float], Union[Transformation, Measurement]]
"""The partial transformation or measurement."""
def __init__(self, f, *args, **kwargs):
self.partial = partial(f, *args, **kwargs)
def __call__(self, v):
"""Returns the transformation or measurement with the given parameter."""
return self.partial(v) # pragma: no cover
[docs]
def fix(self, d_in, d_out, output_measure=None, T=None):
"""Returns the closest transformation or measurement that satisfies the given stability or privacy constraint.
The discovered parameter is assigned to the param attribute of the returned transformation or measurement.
"""
param = binary_search(
lambda x: _cast_measure(self.partial(x), output_measure, d_out).check(
d_in, d_out
),
T=T,
)
chain = self.partial(param)
chain.param = param
return chain
def __rshift__(self, other):
# partials may be chained with other transformations or measurements to form a new partial
if isinstance(other, (Transformation, Measurement)): # pragma: no cover
return PartialChain(lambda x: self.partial(x) >> other)
raise ValueError("At most one parameter may be missing at a time")
[docs]
@classmethod
def wrap(cls, f):
"""Wraps a constructor for a transformation or measurement to return a partial chain instead."""
def inner(*args, **kwargs):
return cls(f, *args, **kwargs)
return inner
def _sequential_composition_by_weights(
domain: Domain,
privacy_unit: Tuple[Metric, float],
privacy_loss: Tuple[Measure, float],
split_evenly_over: Optional[int] = None,
split_by_weights: Optional[List[float]] = None,
) -> Tuple[Measurement, List[Any]]:
"""constructs a sequential composition measurement
where the d_mids are proportional to the weights
:param domain: the domain of the data
:param privacy_unit: a tuple of the input metric and the data distance (d_in)
:param privacy_loss: a tuple of the output measure and the privacy loss (d_out)
:param weights: either a list of weights for each intermediate privacy loss, or the number of ways to evenly distribute the privacy loss
"""
input_metric, d_in = privacy_unit
output_measure, d_out = privacy_loss
if split_evenly_over is not None and split_by_weights is not None:
raise ValueError(
"Cannot specify both `split_evenly_over` and `split_by_weights`"
)
if split_evenly_over is not None:
weights = [d_out] * split_evenly_over
elif split_by_weights is not None:
weights = split_by_weights
else:
raise ValueError(
"Must specify either `split_evenly_over` or `split_by_weights`"
)
def mul(dist, scale):
if isinstance(dist, tuple):
return dist[0] * scale, dist[1] * scale
else:
return dist * scale
def scale_weights(scale, weights):
return [mul(w, scale) for w in weights]
def scale_sc(scale):
return make_sequential_composition(
input_domain=domain,
input_metric=input_metric,
output_measure=output_measure,
d_in=d_in,
d_mids=scale_weights(scale, weights),
)
scale = binary_search_param(scale_sc, d_in=d_in, d_out=d_out, T=float)
# return the accountant and d_mids
return scale_sc(scale), scale_weights(scale, weights)
def _cast_measure(chain, to_measure=None, d_to=None):
"""Casts the output measure of a given `chain` to `to_measure`.
If provided, `d_to` is the privacy loss wrt the new measure.
"""
if to_measure is None or chain.output_measure == to_measure:
return chain
from_to = chain.output_measure.type.origin, to_measure.type.origin
if from_to == ("MaxDivergence", "FixedSmoothedMaxDivergence"):
return make_pureDP_to_fixed_approxDP(chain)
if from_to == ("MaxDivergence", "ZeroConcentratedDivergence"):
return make_pureDP_to_zCDP(chain)
if from_to == (
"ZeroConcentratedDivergence",
"FixedSmoothedMaxDivergence",
):
return make_fix_delta(make_zCDP_to_approxDP(chain), d_to[1])
raise ValueError(f"Unable to cast measure from {from_to[0]} to {from_to[1]}")
def _translate_measure_distance(d_from, from_measure, to_measure):
"""Translate a privacy loss `d_from` from `from_measure` to `to_measure`.
"""
if from_measure == to_measure:
return d_from # pragma: no cover
from_to = from_measure.type.origin, to_measure.type.origin
T = to_measure.type.args[0]
constant = 1.0 # the choice of constant doesn't matter
if from_to == ("MaxDivergence", "FixedSmoothedMaxDivergence"):
return (d_from, 0.0) # pragma: no cover
if from_to == ("ZeroConcentratedDivergence", "MaxDivergence"): # pragma: no cover
space = atom_domain(T=T), absolute_distance(T=T)
scale = binary_search_param(
lambda eps: make_pureDP_to_zCDP(make_base_laplace(*space, eps)),
d_in=constant,
d_out=d_from,
T=float,
)
return make_base_laplace(scale).map(constant)
if from_to == (
"FixedSmoothedMaxDivergence",
"ZeroConcentratedDivergence",
):
def caster(measurement):
return make_fix_delta(make_zCDP_to_approxDP(measurement), delta=d_from[1])
space = atom_domain(T=int), absolute_distance(T=T)
scale = binary_search_param(
lambda scale: caster(make_gaussian(*space, scale)),
d_in=constant,
d_out=d_from,
T=float,
)
return make_gaussian(*space, scale).map(constant)
raise ValueError(f"Unable to translate distance from {from_to[0]} to {from_to[1]}")