diff --git a/dpsynth/bin/main.py b/dpsynth/bin/main.py index 67bd1ec..b3c578c 100644 --- a/dpsynth/bin/main.py +++ b/dpsynth/bin/main.py @@ -29,6 +29,7 @@ import dpsynth from dpsynth.bin import _read_csv_args import fancyflags as ff +import numpy as np import pandas as pd @@ -87,23 +88,20 @@ def main(_): + np.random.seed(_SEED.value) read_csv_kwargs = _READ_CSV_ARGS.value().to_read_csv_kwargs() df = pd.read_csv(_DATASET_PATH.value, **read_csv_kwargs) attribute_domains = dpsynth.domain.from_yaml_file(_DOMAIN_PATH.value) match _MECHANISM.value: case 'mst': - mechanism_config = dpsynth.discrete_mechanisms.MSTConfig(seed=_SEED.value) + mechanism_config = dpsynth.discrete_mechanisms.MSTConfig() case 'aim': - mechanism_config = dpsynth.discrete_mechanisms.AIMConfig(seed=_SEED.value) + mechanism_config = dpsynth.discrete_mechanisms.AIMConfig() case 'independent': - mechanism_config = dpsynth.discrete_mechanisms.IndependentConfig( - seed=_SEED.value - ) + mechanism_config = dpsynth.discrete_mechanisms.IndependentConfig() case 'aim_gdp': - mechanism_config = dpsynth.discrete_mechanisms.AIMGDPConfig( - seed=_SEED.value - ) + mechanism_config = dpsynth.discrete_mechanisms.AIMGDPConfig() case _: raise ValueError(f'Unknown mechanism: {_MECHANISM.value}') diff --git a/dpsynth/data_generation_v2.py b/dpsynth/data_generation_v2.py index 496088c..7d83c9e 100644 --- a/dpsynth/data_generation_v2.py +++ b/dpsynth/data_generation_v2.py @@ -65,7 +65,7 @@ def _compute_privacy_parameters( epsilon: float, delta: float, one_way_marginal_budget_fraction: float, - discrete_config: discrete_mechanisms.DiscreteMechanismConfig, + discrete_config: discrete_mechanisms.DiscreteMechanism, ) -> tuple[float, float]: """Compute privacy parameters for one-way marginals and discrete mechanism.""" @@ -77,10 +77,12 @@ def _compute_privacy_parameters( def make_event_from_param(zcdp_rho): event1 = dp_accounting.GaussianDpEvent(one_way_marginal_sigma) - event2 = discrete_config.dp_event(zcdp_rho) + event2 = discrete_config.calibrate(zcdp_rho=zcdp_rho).dp_event return dp_accounting.ComposedDpEvent([event1, event2]) - if isinstance(discrete_config.dp_event(1.0), dp_accounting.ZCDpEvent): + if isinstance( + discrete_config.calibrate(zcdp_rho=1.0).dp_event, dp_accounting.ZCDpEvent + ): make_fresh_accountant = dp_accounting.rdp.RdpAccountant else: make_fresh_accountant = dp_accounting.pld.PLDAccountant @@ -102,7 +104,7 @@ def generate( epsilon: float, delta: float, *, - discrete_config: discrete_mechanisms.DiscreteMechanismConfig = discrete_mechanisms.MSTConfig(), + discrete_config: discrete_mechanisms.DiscreteMechanism = discrete_mechanisms.MSTMechanism(), numerical_bins: int = 32, one_way_marginal_budget_fraction: float = 0.1, cross_attribute_constraints: Sequence[constraints.Constraint] = (), @@ -245,8 +247,9 @@ def generate( ####################################################################### one_way_marginal_queries = [(col,) for col in discrete.domain] gdp_sigma = accounting.gdp_gaussian_sigma(one_way_marginal_gdp_mu) + rng = np.random.default_rng() one_way_measurements = common.measure_marginals_with_noise( - discrete, one_way_marginal_queries, gdp_sigma + rng, discrete, one_way_marginal_queries, gdp_sigma ) logging.info('[SynthKit Tabular]: Measured one-way marginals.') @@ -262,10 +265,9 @@ def generate( cross_attribute_constraints, discrete.domain ) - model = discrete_mechanisms.run_mechanism( + model = discrete_config.calibrate(zcdp_rho=discrete_zcdp_rho)( + rng, data=discrete, - zcdp_rho=discrete_zcdp_rho, - config=discrete_config, initial_measurements=one_way_measurements, initial_potentials=initial_potentials, ) diff --git a/dpsynth/discrete_mechanisms/__init__.py b/dpsynth/discrete_mechanisms/__init__.py index 0c47d36..f29ef74 100644 --- a/dpsynth/discrete_mechanisms/__init__.py +++ b/dpsynth/discrete_mechanisms/__init__.py @@ -16,10 +16,18 @@ # pylint: disable=g-importing-member -from dpsynth.discrete_mechanisms.aim import AIMConfig -from dpsynth.discrete_mechanisms.aim_gdp import AIMGDPConfig -from dpsynth.discrete_mechanisms.api import DiscreteMechanismConfig -from dpsynth.discrete_mechanisms.api import run_mechanism -from dpsynth.discrete_mechanisms.direct import DirectConfig -from dpsynth.discrete_mechanisms.independent import IndependentConfig -from dpsynth.discrete_mechanisms.mst import MSTConfig +from dpsynth.discrete_mechanisms.aim import AIMMechanism +from dpsynth.discrete_mechanisms.aim_gdp import AIMGDPMechanism +from dpsynth.discrete_mechanisms.direct import DirectMechanism +from dpsynth.discrete_mechanisms.independent import IndependentMechanism +from dpsynth.discrete_mechanisms.mst import MSTMechanism +from dpsynth.discrete_mechanisms.swift import SWIFTMechanism +from dpsynth.local_mode.primitives import DPMechanism as DiscreteMechanism + +# Backwards-compatible aliases. +AIMConfig = AIMMechanism +AIMGDPConfig = AIMGDPMechanism +DirectConfig = DirectMechanism +IndependentConfig = IndependentMechanism +MSTConfig = MSTMechanism +SWIFTConfig = SWIFTMechanism diff --git a/dpsynth/discrete_mechanisms/aim.py b/dpsynth/discrete_mechanisms/aim.py index ac61a3d..d4ea143 100644 --- a/dpsynth/discrete_mechanisms/aim.py +++ b/dpsynth/discrete_mechanisms/aim.py @@ -23,6 +23,7 @@ import dp_accounting from dpsynth.discrete_mechanisms import accounting from dpsynth.discrete_mechanisms import common +from dpsynth.local_mode import primitives import jax.numpy as jnp import mbi import mbi.estimation @@ -63,6 +64,7 @@ def _filter_candidates( def _worst_approximated( + rng: np.random.Generator, candidates: Mapping[MarginalQuery, float], answers: mbi.CliqueVector, estimates: mbi.CliqueVector, @@ -83,13 +85,13 @@ def _worst_approximated( ) # if all weights are 0, could be a problem keys, values = list(errors.keys()), np.array(list(errors.values())) idx = common.exponential_mechanism( - values, eps, max_sensitivity, monotonic=True + values, eps, max_sensitivity, rng, monotonic=True ) return keys[idx] @dataclasses.dataclass -class AIMConfig: +class AIMMechanism(primitives.DPMechanism): """Configuration for the AIM mechanism. Details are described in the paper: @@ -109,7 +111,6 @@ class AIMConfig: workload: A collection of marginal queries (and weights) the synthetic data should be tailored to. max_rounds: The maximum number of rounds to run the mechanism. - seed: The seed for the random number generator. pgm_iters: The number of iterations for the mirror descent algorithm. max_model_size: The maximum size of the graphical model in megabytes. Controls the utility/runtime trade-off. @@ -127,7 +128,6 @@ class AIMConfig: None ) max_rounds: int | None = None - seed: int = 0 pgm_iters: int = 1000 max_model_size: int = 80 max_marginal_size: float = 1e6 @@ -135,136 +135,157 @@ class AIMConfig: anneal_factor: float = 4.0 one_way_budget_fraction: float = 0.1 select_budget_fraction: float = 0.1 + zcdp_rho: float | None = None - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DP event for the AIM mechanism.""" - return dp_accounting.ZCDpEvent(zcdp_rho) - - -def run_mechanism( - data: mbi.Projectable, - config: AIMConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Generate synthetic data via the AIM mechanism.""" - logging.info('[AIM]: Starting Mechanism.') - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) - - np.random.seed(config.seed) - - ######################################################################### - # Compile workload into candidate measurements, and precompute answers. # - ######################################################################### - candidates = common.compiled_workload( - data.domain, config.workload, config.max_marginal_size - ) - answers = mbi.CliqueVector.from_projectable(data, list(candidates)) - logging.info('[AIM]: Calculated workload-query answers.') - terminate = False - rho_remaining = zcdp_rho - max_rounds = config.max_rounds or 16 * len(data.domain) - rho_per_round = zcdp_rho / max_rounds - - if initial_measurements is None: - rho_remaining -= config.one_way_budget_fraction * zcdp_rho - marginal_queries = [cl for cl in candidates.keys() if len(cl) == 1] - measurements = common.measure_marginals_with_noise( - data, - marginal_queries=marginal_queries, - gdp_sigma=zcdp_rho * config.one_way_budget_fraction, - ) - else: - measurements = list(initial_measurements) - - potentials = initial_potentials - if potentials is not None: - potentials = potentials.expand([m.clique for m in measurements]) - model = mbi.estimation.mirror_descent( - data.domain, - measurements, - iters=config.pgm_iters, - potentials=potentials, - marginal_oracle=marginal_oracle, - ) + def calibrate(self, *, zcdp_rho: float) -> 'AIMMechanism': + """Returns a new instance calibrated to the given zCDP budget.""" + return dataclasses.replace(self, zcdp_rho=zcdp_rho) - t = 0 - while not terminate: - t += 1 - if rho_remaining < 2 * rho_per_round: - logging.info('[AIM] Final round, Using all remaining privacy budget.') - rho_per_round = rho_remaining - terminate = True - - ############################################################################ - # Select a marginal query that is worst approximated by the current model. # - ############################################################################ - t0 = time.time() - rho_remaining -= rho_per_round - fraction = config.select_budget_fraction - sigma = accounting.zcdp_gaussian_sigma((1 - fraction) * rho_per_round) - epsilon = accounting.zcdp_exponential_eps(fraction * rho_per_round) - size_limit = config.max_model_size * (zcdp_rho - rho_remaining) / zcdp_rho - small_candidates = _filter_candidates(candidates, model, size_limit) - - estimates = mbi.marginal_oracles.bulk_variable_elimination( - model.potentials, list(small_candidates), total=model.total - ) - marginal_query = _worst_approximated( - small_candidates, answers, estimates, epsilon, sigma, data.domain - ) - - t1 = time.time() - logging.info('[AIM] Found worst-approximated candidate in %.2fs', t1 - t0) - logging.info( - '[AIM] Round %d, Budget used: %.4f, Measuring: %s, Candidates: %d', - t, - (zcdp_rho - rho_remaining) / zcdp_rho, - marginal_query, - len(small_candidates), + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the DP event for the AIM mechanism.""" + if self.zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return dp_accounting.ZCDpEvent(self.zcdp_rho) + + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: list[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Runs the AIM mechanism on the given data. + + Args: + rng: A numpy random number generator. + data: The input data to the mechanism. + initial_measurements: Optional initial measurements to start from. + initial_potentials: Optional initial potentials (constraints). + + Returns: + A MarkovRandomField representing the estimated data distribution. + """ + if self.zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') + + logging.info('[AIM]: Starting Mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) + + zcdp_rho = self.zcdp_rho + + ######################################################################### + # Compile workload into candidate measurements, and precompute answers. # + ######################################################################### + candidates = common.compiled_workload( + data.domain, self.workload, self.max_marginal_size ) - - ###################################################################### - # Measure the marginal query privately using the Gaussian mechanism. # - ###################################################################### - measurement = common.measure_marginals_with_noise( - data, [marginal_query], sigma - )[0] - measurements.append(measurement) - old_estimate = model.project(marginal_query).datavector() - - ##################################################### - # Estimate the data distribution using Private-PGM. # - ##################################################### - t2 = time.time() - callback_fn = mbi.callbacks.default(measurements) - measured_cliques = list(set(m.clique for m in measurements)) - warm_start = model.potentials.expand(measured_cliques) + answers = mbi.CliqueVector.from_projectable(data, list(candidates)) + logging.info('[AIM]: Calculated workload-query answers.') + terminate = False + rho_remaining = zcdp_rho + max_rounds = self.max_rounds or 16 * len(data.domain) + rho_per_round = zcdp_rho / max_rounds + + if initial_measurements is None: + rho_remaining -= self.one_way_budget_fraction * zcdp_rho + marginal_queries = [cl for cl in candidates.keys() if len(cl) == 1] + measurements = common.measure_marginals_with_noise( + rng, + data, + marginal_queries=marginal_queries, + gdp_sigma=zcdp_rho * self.one_way_budget_fraction, + ) + else: + measurements = list(initial_measurements) + + potentials = initial_potentials + if potentials is not None: + potentials = potentials.expand([m.clique for m in measurements]) model = mbi.estimation.mirror_descent( data.domain, measurements, - potentials=warm_start, - iters=config.pgm_iters, - callback_fn=callback_fn, + iters=self.pgm_iters, + potentials=potentials, marginal_oracle=marginal_oracle, ) - t3 = time.time() - logging.info('[AIM] Mirror descent took %.2fs', t3 - t2) - - new_estimate = model.project(marginal_query).datavector() - - ########################################## - # Anneal epsilon and sigma if necessary. # - ########################################## - threshold = sigma * np.sqrt(2 / np.pi) * data.domain.size(marginal_query) - if np.linalg.norm(new_estimate - old_estimate, ord=1) <= threshold: - # No useful information at this noise level, so increase budget per round. - rho_per_round *= config.anneal_factor - fraction = config.select_budget_fraction - sigma = accounting.zcdp_gaussian_sigma((1 - fraction) * rho_per_round) - logging.info('[AIM] Reducing sigma: %.1f', sigma) - return model + t = 0 + while not terminate: + t += 1 + if rho_remaining < 2 * rho_per_round: + logging.info('[AIM] Final round, Using all remaining privacy budget.') + rho_per_round = rho_remaining + terminate = True + + ######################################################################## + # Select a marginal query worst approximated by the current model. # + ######################################################################## + t0 = time.time() + rho_remaining -= rho_per_round + fraction = self.select_budget_fraction + sigma = accounting.zcdp_gaussian_sigma((1 - fraction) * rho_per_round) + epsilon = accounting.zcdp_exponential_eps(fraction * rho_per_round) + size_limit = self.max_model_size * (zcdp_rho - rho_remaining) / zcdp_rho + small_candidates = _filter_candidates(candidates, model, size_limit) + + estimates = mbi.marginal_oracles.bulk_variable_elimination( + model.potentials, list(small_candidates), total=model.total + ) + marginal_query = _worst_approximated( + rng, small_candidates, answers, estimates, epsilon, sigma, data.domain + ) + + t1 = time.time() + logging.info('[AIM] Found worst-approximated candidate in %.2fs', t1 - t0) + logging.info( + '[AIM] Round %d, Budget used: %.4f, Measuring: %s, Candidates: %d', + t, + (zcdp_rho - rho_remaining) / zcdp_rho, + marginal_query, + len(small_candidates), + ) + + ###################################################################### + # Measure the marginal query privately using the Gaussian mechanism. # + ###################################################################### + measurement = common.measure_marginals_with_noise( + rng, data, [marginal_query], sigma + )[0] + measurements.append(measurement) + old_estimate = model.project(marginal_query).datavector() + + ##################################################### + # Estimate the data distribution using Private-PGM. # + ##################################################### + t2 = time.time() + callback_fn = mbi.callbacks.default(measurements) + measured_cliques = list(set(m.clique for m in measurements)) + warm_start = model.potentials.expand(measured_cliques) + model = mbi.estimation.mirror_descent( + data.domain, + measurements, + potentials=warm_start, + iters=self.pgm_iters, + callback_fn=callback_fn, + marginal_oracle=marginal_oracle, + ) + t3 = time.time() + logging.info('[AIM] Mirror descent took %.2fs', t3 - t2) + + new_estimate = model.project(marginal_query).datavector() + + ########################################## + # Anneal epsilon and sigma if necessary. # + ########################################## + threshold = sigma * np.sqrt(2 / np.pi) * data.domain.size(marginal_query) + if np.linalg.norm(new_estimate - old_estimate, ord=1) <= threshold: + # No useful information at this noise level, increase budget per round. + rho_per_round *= self.anneal_factor + fraction = self.select_budget_fraction + sigma = accounting.zcdp_gaussian_sigma((1 - fraction) * rho_per_round) + logging.info('[AIM] Reducing sigma: %.1f', sigma) + + return model diff --git a/dpsynth/discrete_mechanisms/aim_gdp.py b/dpsynth/discrete_mechanisms/aim_gdp.py index 414d717..624d5b0 100644 --- a/dpsynth/discrete_mechanisms/aim_gdp.py +++ b/dpsynth/discrete_mechanisms/aim_gdp.py @@ -23,6 +23,7 @@ import dp_accounting from dpsynth.discrete_mechanisms import accounting from dpsynth.discrete_mechanisms import common +from dpsynth.local_mode import primitives import jax.numpy as jnp import mbi import mbi.estimation @@ -63,6 +64,7 @@ def expected_size(cl): def _compute_dp_errors( + rng: np.random.Generator, answers: mbi.CliqueVector, estimates: mbi.CliqueVector, gdp_budget: float, @@ -80,12 +82,13 @@ def _compute_dp_errors( actual = answers[cl].datavector(flatten=True) estimate = estimates[cl].datavector(flatten=True) error = jnp.linalg.norm(actual - estimate, ord=1) - noise = np.random.normal(loc=0, scale=per_candidate_sigma, size=None) + noise = rng.normal(loc=0, scale=per_candidate_sigma) result[cl] = error + noise return result def _worst_approximated( + rng: np.random.Generator, candidates: Mapping[MarginalQuery, float], errors: dict[MarginalQuery, float], # will be updated in-place. answers: mbi.CliqueVector, # derived from sensitive data. @@ -98,7 +101,7 @@ def _worst_approximated( current_score_estimates = {} for cl in candidates: weight = candidates[cl] - bias = np.sqrt(2 / jnp.pi) * measure_sigma * model.domain.size(cl) + bias = (2 / np.pi) ** 0.5 * measure_sigma * model.domain.size(cl) current_score_estimates[cl] = weight * (errors[cl] - bias) subset = sorted(current_score_estimates, key=current_score_estimates.get) @@ -108,20 +111,22 @@ def _worst_approximated( model.potentials, subset, model.total ) # Only step that uses "answers", satisfies DP. - current_errors = _compute_dp_errors(answers, estimates, select_budget, subset) + current_errors = _compute_dp_errors( + rng, answers, estimates, select_budget, subset + ) errors.update(current_errors) current_scores = {} for cl in subset: weight = candidates[cl] - bias = np.sqrt(2 / jnp.pi) * measure_sigma * model.domain.size(cl) + bias = (2 / np.pi) ** 0.5 * measure_sigma * model.domain.size(cl) current_scores[cl] = weight * (errors[cl] - bias) return max(current_scores, key=current_scores.get) @dataclasses.dataclass -class AIMGDPConfig: +class AIMGDPMechanism(primitives.DPMechanism): """Configuration for the AIM mechanism with Gaussian DP. Details are described in the paper: @@ -143,7 +148,6 @@ class AIMGDPConfig: each marginal query. A default value of 1.0 will be assigned if the workload is provided as a list. max_rounds: The maximum number of rounds to run the mechanism. - seed: The seed for the random number generator. pgm_iters: The number of iterations for the mirror descent algorithm. max_model_size: The maximum size of the graphical model in megabytes. Controls the utility/runtime trade-off. @@ -159,13 +163,14 @@ class AIMGDPConfig: one-way marginals. select_budget_fraction: The fraction of the privacy budget to use for the "Select" step. + gdp_sigma: The GDP sigma of the end-to-end mechanism. Privacy budget is + split across rounds internally. """ workload: Mapping[MarginalQuery, float] | Iterable[MarginalQuery] | None = ( None ) max_rounds: int | None = None - seed: int = 0 pgm_iters: int = 1000 max_model_size: int = 80 max_marginal_size: float = 1e6 @@ -174,155 +179,183 @@ class AIMGDPConfig: anneal_factor: float = 4.0 one_way_budget_fraction: float = 0.1 select_budget_fraction: float = 0.1 + gdp_sigma: float | None = None - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DP event for the AIM mechanism.""" - sigma = accounting.zcdp_gaussian_sigma(zcdp_rho) - return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) - - -def run_mechanism( - data: mbi.Projectable, - config: AIMGDPConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Generate synthetic data via the AIM mechanism.""" - logging.info('[AIM] Starting Mechanism.') - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) - gdp_budget = accounting.zcdp_to_gdp(zcdp_rho) - - np.random.seed(config.seed) - - ######################################################################### - # Compile workload into candidate measurements, and precompute answers. # - ######################################################################### - candidates = common.compiled_workload( - data.domain, config.workload, config.max_marginal_size - ) - answers = mbi.CliqueVector.from_projectable(data, candidates) - - logging.info('[AIM] Calculated workload-query answers.') - terminate = False - budget_remaining = gdp_budget - domain = data.domain - max_rounds = config.max_rounds or 16 * len(domain) - budget_per_round = budget_remaining / max_rounds - - if initial_measurements is None: - one_way_budget = config.one_way_budget_fraction * gdp_budget - one_way_gdp_sigma = accounting.gdp_gaussian_sigma(one_way_budget) - budget_remaining -= one_way_budget - marginal_queries = [cl for cl in candidates.keys() if len(cl) == 1] - measurements = common.measure_marginals_with_noise( - data, - marginal_queries=marginal_queries, - gdp_sigma=one_way_gdp_sigma, + def calibrate(self, *, zcdp_rho: float) -> 'AIMGDPMechanism': + """Returns a new instance calibrated to the given zCDP budget.""" + return dataclasses.replace( + self, gdp_sigma=accounting.zcdp_gaussian_sigma(zcdp_rho) ) - else: - measurements = list(initial_measurements) - - potentials = initial_potentials - if potentials is not None: - potentials = potentials.expand([m.clique for m in measurements]) - - model = mbi.estimation.mirror_descent( - domain, - measurements, - iters=config.pgm_iters, - potentials=potentials, - marginal_oracle=marginal_oracle, - ) - logging.info('[AIM] Estimated initial model.') - budget_remaining -= 0.5 * budget_per_round - estimates = mbi.marginal_oracles.bulk_variable_elimination( - model.potentials, list(candidates), model.total - ) - errors = _compute_dp_errors(answers, estimates, 0.5 * budget_per_round) - logging.info('[AIM] Computed initial errors.') - - t = 0 - while not terminate: - t += 1 - if budget_remaining < 2 * budget_per_round: - logging.info('[AIM] Final round, Using all remaining privacy budget.') - budget_per_round = budget_remaining - terminate = True - - ############################################################################ - # Select a marginal query that is worst approximated by the current model. # - ############################################################################ - t0 = time.time() - budget_remaining -= budget_per_round - measure_budget = budget_per_round * (1 - config.select_budget_fraction) - select_budget = budget_per_round * config.select_budget_fraction - measure_sigma = accounting.gdp_gaussian_sigma(measure_budget) - percent_used = (gdp_budget - budget_remaining) / gdp_budget - size_limit = config.max_model_size * percent_used - small_candidates = _filter_candidates(candidates, model, size_limit) - - marginal_query = _worst_approximated( - candidates=small_candidates, - errors=errors, - answers=answers, - model=model, - select_budget=select_budget, - measure_sigma=measure_sigma, - max_new_evals=config.max_candidates_per_round, + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the DP event for the AIM-GDP mechanism.""" + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return dp_accounting.GaussianDpEvent(noise_multiplier=self.gdp_sigma) + + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: list[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Runs the AIM-GDP mechanism on the given data. + + Args: + rng: A numpy random number generator. + data: The input data to the mechanism. + initial_measurements: Optional initial measurements to start from. + initial_potentials: Optional initial potentials (constraints). + + Returns: + A MarkovRandomField representing the estimated data distribution. + """ + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + + logging.info('[AIM] Starting Mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) + + # Convert end-to-end GDP sigma to budget for internal allocation. + gdp_budget = 1.0 / self.gdp_sigma**2 + + ######################################################################### + # Compile workload into candidate measurements, and precompute answers. # + ######################################################################### + candidates = common.compiled_workload( + data.domain, self.workload, self.max_marginal_size ) + answers = mbi.CliqueVector.from_projectable(data, candidates) + + logging.info('[AIM] Calculated workload-query answers.') + terminate = False + budget_remaining = gdp_budget + domain = data.domain + max_rounds = self.max_rounds or 16 * len(domain) + budget_per_round = budget_remaining / max_rounds + + if initial_measurements is None: + one_way_budget = self.one_way_budget_fraction * gdp_budget + one_way_gdp_sigma = accounting.gdp_gaussian_sigma(one_way_budget) + budget_remaining -= one_way_budget + marginal_queries = [cl for cl in candidates.keys() if len(cl) == 1] + # measure_marginals_with_noise splits one_way_gdp_sigma across queries. + measurements = common.measure_marginals_with_noise( + rng, + data, + marginal_queries=marginal_queries, + gdp_sigma=one_way_gdp_sigma, + ) + else: + measurements = list(initial_measurements) + + potentials = initial_potentials + if potentials is not None: + potentials = potentials.expand([m.clique for m in measurements]) - t1 = time.time() - logging.info('[AIM] Found worst-approximated candidate in %.2fs', t1 - t0) - logging.info( - '[AIM] Round %d, Budget used: %.4f, Measuring: %s, Candidates: %d', - t, - percent_used, - marginal_query, - len(small_candidates), - ) - - ###################################################################### - # Measure the marginal query privately using the Gaussian mechanism. # - ###################################################################### - measurement = common.measure_marginals_with_noise( - data, [marginal_query], measure_sigma - )[0] - measurements.append(measurement) - old_estimate = model.project(marginal_query).datavector() - - ##################################################### - # Estimate the data distribution using Private-PGM. # - ##################################################### - t2 = time.time() - callback_fn = mbi.callbacks.default(measurements) - measured_cliques = list(set(m.clique for m in measurements)) - warm_start = model.potentials.expand(measured_cliques) model = mbi.estimation.mirror_descent( domain, measurements, - potentials=warm_start, - iters=config.pgm_iters, - callback_fn=callback_fn, + iters=self.pgm_iters, + potentials=potentials, marginal_oracle=marginal_oracle, ) - t3 = time.time() - logging.info('[AIM] Mirror descent took %.2fs', t3 - t2) - - new_estimate = model.project(marginal_query).datavector() - - ########################################## - # Anneal epsilon and sigma if necessary. # - ########################################## - # See Alg 4 of https://arxiv.org/pdf/2201.12677. - # of just the largest error candidate), we can maybe simplify this logic. - threshold = measure_sigma * np.sqrt(2 / np.pi) * domain.size(marginal_query) - if np.linalg.norm(new_estimate - old_estimate, ord=1) <= threshold: - # No useful information at this noise level, so increase budget per round. - budget_per_round *= config.anneal_factor - logging.info('[AIM] Increasing budget per round: %.5f', budget_per_round) - - return model + logging.info('[AIM] Estimated initial model.') + + budget_remaining -= 0.5 * budget_per_round + estimates = mbi.marginal_oracles.bulk_variable_elimination( + model.potentials, list(candidates), model.total + ) + errors = _compute_dp_errors(rng, answers, estimates, 0.5 * budget_per_round) + logging.info('[AIM] Computed initial errors.') + + t = 0 + while not terminate: + t += 1 + if budget_remaining < 2 * budget_per_round: + logging.info('[AIM] Final round, Using all remaining privacy budget.') + budget_per_round = budget_remaining + terminate = True + + ######################################################################## + # Select a marginal query worst approximated by the current model. # + ######################################################################## + t0 = time.time() + budget_remaining -= budget_per_round + measure_budget = budget_per_round * (1 - self.select_budget_fraction) + select_budget = budget_per_round * self.select_budget_fraction + measure_sigma = accounting.gdp_gaussian_sigma(measure_budget) + percent_used = (gdp_budget - budget_remaining) / gdp_budget + size_limit = self.max_model_size * percent_used + small_candidates = _filter_candidates(candidates, model, size_limit) + + marginal_query = _worst_approximated( + rng, + candidates=small_candidates, + errors=errors, + answers=answers, + model=model, + select_budget=select_budget, + measure_sigma=measure_sigma, + max_new_evals=self.max_candidates_per_round, + ) + + t1 = time.time() + logging.info('[AIM] Found worst candidate in %.2fs', t1 - t0) + logging.info( + '[AIM] Round %d, Budget used: %.4f, Measuring: %s, Candidates: %d', + t, + percent_used, + marginal_query, + len(small_candidates), + ) + + ###################################################################### + # Measure the marginal query privately using the Gaussian mechanism. # + ###################################################################### + measurement = common.measure_marginals_with_noise( + rng, data, [marginal_query], measure_sigma + )[0] + measurements.append(measurement) + old_estimate = model.project(marginal_query).datavector() + + ##################################################### + # Estimate the data distribution using Private-PGM. # + ##################################################### + t2 = time.time() + callback_fn = mbi.callbacks.default(measurements) + measured_cliques = list(set(m.clique for m in measurements)) + warm_start = model.potentials.expand(measured_cliques) + model = mbi.estimation.mirror_descent( + domain, + measurements, + potentials=warm_start, + iters=self.pgm_iters, + callback_fn=callback_fn, + marginal_oracle=marginal_oracle, + ) + t3 = time.time() + logging.info('[AIM] Mirror descent took %.2fs', t3 - t2) + + new_estimate = model.project(marginal_query).datavector() + + ########################################## + # Anneal epsilon and sigma if necessary. # + ########################################## + # See Alg 4 of https://arxiv.org/pdf/2201.12677. + # of just the largest error candidate), we can maybe simplify this logic. + threshold = ( + measure_sigma * (2 / np.pi) ** 0.5 * domain.size(marginal_query) + ) + if np.linalg.norm(new_estimate - old_estimate, ord=1) <= threshold: + # No useful information at this noise level, increase budget per round. + budget_per_round *= self.anneal_factor + logging.info( + '[AIM] Increasing budget per round: %.5f', budget_per_round + ) + + return model diff --git a/dpsynth/discrete_mechanisms/api.py b/dpsynth/discrete_mechanisms/api.py deleted file mode 100644 index 15a05bf..0000000 --- a/dpsynth/discrete_mechanisms/api.py +++ /dev/null @@ -1,77 +0,0 @@ -# Copyright 2026 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A generic mechanism interface and broadly useful helper functions.""" - -from typing import Protocol - -import dp_accounting -import mbi - -from . import aim -from . import aim_gdp -from . import direct -from . import independent -from . import mst - - -class DiscreteMechanismConfig(Protocol): - """A generic mechanism configuration that operates on discrete data. - - Note: For consistency across the library, discrete mechanisms can be run - with a given zCDP budget (rho). However, a more precise characterization - of the privacy properties of the mechanism is given by the ``dp_event`` - method defined on this class. Given a ``zcdp_rho`` value passed into - ``run_mechanism``, this returns a DpEvent that characterizes the privacy - properties of the mechanism. For instance, if the mechanism satisfies mu-GDP, - then the returned DpEvent will be - ``dp_accounting.GaussianDpEvent(sigma=math.sqrt(0.5 / zcdp_rho))``. By using - PLD accounting, one can use this ``dp_event`` to compute tighter - (epsilon, delta) guarantees, or calibrate the ``zcdp_rho`` value for a - desired (epsilon, delta) guarantee. - """ - - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DP event for the mechanism.""" - - -def run_mechanism( - data: mbi.Projectable, - config: DiscreteMechanismConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Runs a discrete mechanism with the given configuration and privacy parameter.""" - if isinstance(config, aim.AIMConfig): - run_mechanism_fn = aim.run_mechanism - elif isinstance(config, aim_gdp.AIMGDPConfig): - run_mechanism_fn = aim_gdp.run_mechanism - elif isinstance(config, mst.MSTConfig): - run_mechanism_fn = mst.run_mechanism - elif isinstance(config, direct.DirectConfig): - run_mechanism_fn = direct.run_mechanism - elif isinstance(config, independent.IndependentConfig): - run_mechanism_fn = independent.run_mechanism - else: - raise ValueError(f'Unknown mechanism: {type(config)}') - - return run_mechanism_fn( - data, - config, - zcdp_rho=zcdp_rho, - initial_measurements=initial_measurements, - initial_potentials=initial_potentials, - ) diff --git a/dpsynth/discrete_mechanisms/common.py b/dpsynth/discrete_mechanisms/common.py index e8c2209..e81a298 100644 --- a/dpsynth/discrete_mechanisms/common.py +++ b/dpsynth/discrete_mechanisms/common.py @@ -31,17 +31,18 @@ def exponential_mechanism( quality_scores: np.ndarray, epsilon: float, sensitivity: float, - prng: np.random.RandomState = np.random, + rng: np.random.Generator, monotonic: bool = False, ) -> int: """Returns an index chosen by the exponential mechanism.""" coef = 1.0 if monotonic else 0.5 scores = coef * epsilon / sensitivity * quality_scores probas = scipy.special.softmax(scores) - return prng.choice(quality_scores.size, p=probas) + return rng.choice(quality_scores.size, p=probas) def measure_marginals_with_noise( + rng: np.random.Generator, data: mbi.Projectable, marginal_queries: list[tuple[str, ...]], gdp_sigma: float, @@ -55,6 +56,7 @@ def measure_marginals_with_noise( sigma is divided proportionally to the weights. Args: + rng: A numpy random number generator. data: The sensitive dataset whose marginals are to be measured. marginal_queries: The list of marginal queries to measure, represented as a list of tuples of column names. @@ -75,7 +77,7 @@ def measure_marginals_with_noise( measurements = [] for proj, wgt in zip(marginal_queries, weights): x = data.project(proj).datavector() - y = x + np.random.normal(loc=0, scale=gdp_sigma / wgt, size=x.size) + y = x + rng.normal(loc=0, scale=gdp_sigma / wgt, size=x.size) measurements.append(mbi.LinearMeasurement(y, proj, gdp_sigma / wgt)) return measurements diff --git a/dpsynth/discrete_mechanisms/direct.py b/dpsynth/discrete_mechanisms/direct.py index 23ff807..6ba9d56 100644 --- a/dpsynth/discrete_mechanisms/direct.py +++ b/dpsynth/discrete_mechanisms/direct.py @@ -19,66 +19,73 @@ import dp_accounting from dpsynth.discrete_mechanisms import accounting from dpsynth.discrete_mechanisms import common +from dpsynth.local_mode import primitives import mbi import numpy as np @dataclasses.dataclass -class DirectConfig: +class DirectMechanism(primitives.DPMechanism): """Configuration for the direct mechanism. Attributes: prespecified_marginal_queries: A list of k-way marginals that a user has specified, ONLY these will be used outside of the initial measurements. - seed: The seed for the random number generator. pgm_iters: The number of iterations for the mirror descent algorithm. marginal_oracle: The marginal oracle to use for the mirror descent algorithm. + gdp_sigma: The GDP sigma of the end-to-end mechanism. Privacy budget is + split across the prespecified marginal queries internally. """ prespecified_marginal_queries: list[tuple[str, ...]] - seed: int = 0 pgm_iters: int = 5000 marginal_oracle: mbi.MarginalOracle | None = None + gdp_sigma: float | None = None - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DP event for the direct mechanism.""" - sigma = accounting.zcdp_gaussian_sigma(zcdp_rho) - return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) - + def calibrate(self, *, zcdp_rho: float) -> 'DirectMechanism': + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace( + self, gdp_sigma=accounting.zcdp_gaussian_sigma(zcdp_rho) + ) -def run_mechanism( - data: mbi.Projectable, - config: DirectConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Generate synthetic data using user specified two way marginals.""" - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the DP event for the direct mechanism.""" + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return dp_accounting.GaussianDpEvent(noise_multiplier=self.gdp_sigma) - np.random.seed(config.seed) - # the entire remaining budget rho can be used for measuring the - # provided marginals with the gauss mechanism - no - # budget spent on selection - gdp_sigma = accounting.zcdp_gaussian_sigma(zcdp_rho) + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: list[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Generate synthetic data using user specified two way marginals.""" + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) - new_measurements = common.measure_marginals_with_noise( - data, config.prespecified_marginal_queries, gdp_sigma - ) - if initial_measurements: - all_measurements = initial_measurements + new_measurements - else: - all_measurements = new_measurements + # measure_marginals_with_noise splits gdp_sigma across the queries + # internally via weight normalization. + new_measurements = common.measure_marginals_with_noise( + rng, data, self.prespecified_marginal_queries, self.gdp_sigma + ) + if initial_measurements: + all_measurements = initial_measurements + new_measurements + else: + all_measurements = new_measurements - # fit a distribution to the noisy measurements - model = mbi.estimation.mirror_descent( - data.domain, - all_measurements, - iters=config.pgm_iters, - potentials=initial_potentials, - marginal_oracle=marginal_oracle, - ) - return model + # fit a distribution to the noisy measurements + model = mbi.estimation.mirror_descent( + data.domain, + all_measurements, + iters=self.pgm_iters, + potentials=initial_potentials, + marginal_oracle=marginal_oracle, + ) + return model diff --git a/dpsynth/discrete_mechanisms/independent.py b/dpsynth/discrete_mechanisms/independent.py index 046c411..6485669 100644 --- a/dpsynth/discrete_mechanisms/independent.py +++ b/dpsynth/discrete_mechanisms/independent.py @@ -19,64 +19,76 @@ import dp_accounting from dpsynth.discrete_mechanisms import accounting from dpsynth.discrete_mechanisms import common -import jax +from dpsynth.local_mode import primitives import mbi +import numpy as np @dataclasses.dataclass -class IndependentConfig: +class IndependentMechanism(primitives.DPMechanism): """Configuration for the independent mechanism. Attributes: pgm_iters: The number of iterations for the mirror descent algorithm. - seed: The seed for the random number generator. marginal_oracle: The marginal oracle to use for the mirror descent algorithm. + gdp_sigma: The GDP sigma of the end-to-end mechanism. Privacy budget is + split across the one-way marginals internally. """ pgm_iters: int = 5000 - seed: int = 0 marginal_oracle: mbi.MarginalOracle | None = None + gdp_sigma: float | None = None - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: - """Returns the DP event for the independent mechanism.""" - sigma = accounting.zcdp_gaussian_sigma(zcdp_rho) - return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) - + def calibrate(self, *, zcdp_rho: float) -> 'IndependentMechanism': + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace( + self, gdp_sigma=accounting.zcdp_gaussian_sigma(zcdp_rho) + ) -def run_mechanism( - data: mbi.Projectable, - config: IndependentConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Generate synthetic data via the independent mechanism.""" - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) + @property + def dp_event(self) -> dp_accounting.DpEvent: + """Returns the DP event for the independent mechanism.""" + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return dp_accounting.GaussianDpEvent(noise_multiplier=self.gdp_sigma) - gdp_budget = accounting.zcdp_to_gdp(zcdp_rho) + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: list[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Generate synthetic data via the independent mechanism.""" + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) - attributes = len(data.domain) - sigma = accounting.gdp_gaussian_sigma(gdp_budget / attributes) - measurements = initial_measurements or [] - keys = jax.random.split(jax.random.key(config.seed), attributes) - for attr, key in zip(data.domain, keys): - clique = (attr,) - marginal = data.project(clique).datavector() - noisy_marginal = marginal + jax.random.normal(key, marginal.shape) * sigma - measurements.append(mbi.LinearMeasurement(noisy_marginal, clique)) + # Split end-to-end gdp_sigma across the d one-way marginals: + # per-query sigma = gdp_sigma * sqrt(d). + attributes = len(data.domain) + per_query_sigma = self.gdp_sigma * attributes**0.5 + measurements = initial_measurements or [] + for attr in data.domain: + clique = (attr,) + marginal = data.project(clique).datavector() + noisy_marginal = ( + marginal + rng.normal(size=marginal.shape) * per_query_sigma + ) + measurements.append(mbi.LinearMeasurement(noisy_marginal, clique)) - potentials = initial_potentials - if potentials is not None: - potentials = potentials.expand([m.clique for m in measurements]) + potentials = initial_potentials + if potentials is not None: + potentials = potentials.expand([m.clique for m in measurements]) - model = mbi.estimation.mirror_descent( - data.domain, - measurements, - iters=config.pgm_iters, - potentials=potentials, - marginal_oracle=marginal_oracle, - ) - return model + model = mbi.estimation.mirror_descent( + data.domain, + measurements, + iters=self.pgm_iters, + potentials=potentials, + marginal_oracle=marginal_oracle, + ) + return model diff --git a/dpsynth/discrete_mechanisms/mst.py b/dpsynth/discrete_mechanisms/mst.py index 1344d0b..a8a3b3a 100644 --- a/dpsynth/discrete_mechanisms/mst.py +++ b/dpsynth/discrete_mechanisms/mst.py @@ -14,6 +14,8 @@ """Implementation of the Maximum Spanning Tree mechanism.""" +from __future__ import annotations + from collections.abc import Sequence import dataclasses import itertools @@ -22,6 +24,7 @@ import dp_accounting from dpsynth.discrete_mechanisms import accounting from dpsynth.discrete_mechanisms import common +from dpsynth.local_mode import primitives import mbi import networkx as nx import numpy as np @@ -30,6 +33,7 @@ def dp_maximum_spanning_tree( + rng: np.random.Generator, weights: dict[tuple[str, str], float], zcdp_rho: float | None = None, exponential_mechanism_epsilon: float | None = None, @@ -49,6 +53,7 @@ def dp_maximum_spanning_tree( the data (i.e., L1 norm between true and estimated marginal). Args: + rng: A numpy random number generator. weights: A dictionary mapping pairs of attributes to the sensitivity 1 measure of correlation between them. zcdp_rho: the zCDP budget to use for this mechanism. @@ -85,7 +90,7 @@ def dp_maximum_spanning_tree( candidates = [e for e in candidates if not ds.connected(*e)] wgts = np.array([weights[e] for e in candidates]) idx = common.exponential_mechanism( - wgts, exponential_mechanism_epsilon, sensitivity=1.0 + wgts, exponential_mechanism_epsilon, sensitivity=1.0, rng=rng ) e = candidates[idx] tree.add_edge(*e) @@ -95,6 +100,7 @@ def dp_maximum_spanning_tree( def _select_two_way_marginal_queries( + rng: np.random.Generator, data: mbi.Projectable, zcdp_rho: float, one_way_measurements: list[mbi.LinearMeasurement], @@ -106,6 +112,7 @@ def _select_two_way_marginal_queries( This mechanism satisfies rho-zCDP. Args: + rng: A numpy random number generator. data: The sensitive dataset to use to determine the quality scores of each two-way marginal query. zcdp_rho: The zCDP privacy parameter. @@ -142,6 +149,7 @@ def _select_two_way_marginal_queries( weights[a, b] = np.linalg.norm(x - xhat, 1) return dp_maximum_spanning_tree( + rng, weights, zcdp_rho=zcdp_rho, initial_marginal_queries=initial_marginal_queries, @@ -149,7 +157,7 @@ def _select_two_way_marginal_queries( @dataclasses.dataclass -class MSTConfig: +class MSTMechanism(primitives.DPMechanism): """Configuration for the maximum spanning tree mechanism. Details are described in the paper: @@ -158,7 +166,6 @@ class MSTConfig: Attributes: pgm_iters: The number of iterations for the mirror descent algorithm. - seed: The seed for the random number generator. maximum_marginal_size: The maximum size of a marginal query. marginal_oracle: The marginal oracle to use for the mirror descent algorithm. @@ -169,77 +176,99 @@ class MSTConfig: """ pgm_iters: int = 5000 - seed: int = 0 maximum_marginal_size: int = 10_000_000 marginal_oracle: mbi.MarginalOracle | None = None one_way_budget_fraction: float = 1 / 3 select_budget_fraction: float = 1 / 3 + zcdp_rho: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> MSTMechanism: + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace(self, zcdp_rho=zcdp_rho) - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: + @property + def dp_event(self) -> dp_accounting.DpEvent: """Returns the DP event for the MST mechanism.""" + if self.zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') # exponential mechanisms and (d-1) Gaussian mechanisms. - return dp_accounting.ZCDpEvent(zcdp_rho) - - -def run_mechanism( - data: mbi.Projectable, - config: MSTConfig, - zcdp_rho: float, - *, - initial_measurements: list[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Generate synthetic data via the MST mechanism.""" - logging.info('[MST]: Starting MST mechanism.') - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) - budget_remaining = zcdp_rho - - logging.info('[MST]: Starting MST mechanism.') - np.random.seed(config.seed) - if initial_measurements is None: - budget_remaining -= config.one_way_budget_fraction * zcdp_rho - one_way_measurements = common.measure_marginals_with_noise( + return dp_accounting.ZCDpEvent(self.zcdp_rho) + + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: list[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Runs the MST mechanism on the given data. + + Args: + rng: A numpy random number generator. + data: The sensitive dataset. + initial_measurements: Optional pre-existing one-way measurements. + initial_potentials: Optional initial potentials for constrained + estimation. + + Returns: + A fitted MarkovRandomField model. + + Raises: + ValueError: If calibrate() has not been called. + """ + if self.zcdp_rho is None: + raise ValueError('Must call calibrate() before using the mechanism.') + logging.info('[MST]: Starting MST mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) + budget_remaining = self.zcdp_rho + + if initial_measurements is None: + budget_remaining -= self.one_way_budget_fraction * self.zcdp_rho + one_way_measurements = common.measure_marginals_with_noise( + rng, + data, + marginal_queries=[(a,) for a in data.domain], + gdp_sigma=self.zcdp_rho * self.one_way_budget_fraction, + ) + else: + one_way_measurements = initial_measurements + + exponential_rho = self.select_budget_fraction * self.zcdp_rho + budget_remaining -= exponential_rho + # Select and measure 2-way marginals using rho/3 budget for each step. + two_way_marginal_queries = _select_two_way_marginal_queries( + rng, data, - marginal_queries=[(a,) for a in data.domain], - gdp_sigma=zcdp_rho * config.one_way_budget_fraction, + exponential_rho, + one_way_measurements, + maximum_marginal_size=self.maximum_marginal_size, ) - else: - one_way_measurements = initial_measurements - - exponential_rho = config.select_budget_fraction * zcdp_rho - budget_remaining -= exponential_rho - # Select and measure 2-way marginals using rho/3 budget for each step. - two_way_marginal_queries = _select_two_way_marginal_queries( - data, - exponential_rho, - one_way_measurements, - maximum_marginal_size=config.maximum_marginal_size, - ) - logging.info('[MST]: Selected two-way marginal queries.') - gaussian_rho = budget_remaining - sigma = accounting.zcdp_gaussian_sigma(gaussian_rho) - two_way_measurements = common.measure_marginals_with_noise( - data, two_way_marginal_queries, sigma - ) - logging.info('[MST]: Measured two-way marginals.') - all_measurements = one_way_measurements + two_way_measurements - # Fit a distribution to the noisy measurements using Private-PGM. - potentials = initial_potentials - if potentials is not None: - potentials = potentials.expand([m.clique for m in all_measurements]) - - model_size = mbi.junction_tree.hypothetical_model_size( - data.domain, [m.clique for m in all_measurements] - ) - logging.info('[MST]: Model size: %d MB', model_size) - model = mbi.estimation.mirror_descent( - data.domain, - all_measurements, - iters=config.pgm_iters, - potentials=potentials, - callback_fn=mbi.callbacks.default(all_measurements), - marginal_oracle=marginal_oracle, - ) - logging.info('[MST]: Fit distribution to the noisy measurements.') - return model + logging.info('[MST]: Selected two-way marginal queries.') + gaussian_rho = budget_remaining + sigma = accounting.zcdp_gaussian_sigma(gaussian_rho) + two_way_measurements = common.measure_marginals_with_noise( + rng, data, two_way_marginal_queries, sigma + ) + logging.info('[MST]: Measured two-way marginals.') + all_measurements = one_way_measurements + two_way_measurements + # Fit a distribution to the noisy measurements using Private-PGM. + potentials = initial_potentials + if potentials is not None: + potentials = potentials.expand([m.clique for m in all_measurements]) + + model_size = mbi.junction_tree.hypothetical_model_size( + data.domain, [m.clique for m in all_measurements] + ) + logging.info('[MST]: Model size: %d MB', model_size) + model = mbi.estimation.mirror_descent( + data.domain, + all_measurements, + iters=self.pgm_iters, + potentials=potentials, + callback_fn=mbi.callbacks.default(all_measurements), + marginal_oracle=marginal_oracle, + ) + logging.info('[MST]: Fit distribution to the noisy measurements.') + return model diff --git a/dpsynth/discrete_mechanisms/swift.py b/dpsynth/discrete_mechanisms/swift.py index 18bf4db..2a698ef 100644 --- a/dpsynth/discrete_mechanisms/swift.py +++ b/dpsynth/discrete_mechanisms/swift.py @@ -23,6 +23,8 @@ that maximizes the likelihood of the noisy marginals measured. """ +from __future__ import annotations + from collections.abc import Iterable, Mapping, Sequence import dataclasses import functools @@ -34,6 +36,7 @@ from dpsynth.discrete_mechanisms import clique_tree from dpsynth.discrete_mechanisms import common from dpsynth.discrete_mechanisms import swift_utils +from dpsynth.local_mode import primitives import jax import mbi import networkx as nx @@ -41,8 +44,8 @@ import tqdm -@dataclasses.dataclass(frozen=True) -class SWIFTConfig: +@dataclasses.dataclass +class SWIFTMechanism(primitives.DPMechanism): """Configuration for the SWIFT mechanism. Attributes: @@ -58,7 +61,8 @@ class SWIFTConfig: marginals to measure. one_way_budget_frac: Fraction of the total budget used for measuring one-way marginals initially. - seed: Random seed for reproducibility. + gdp_sigma: The GDP sigma of the end-to-end mechanism. Privacy budget is + split across measurement steps internally. """ workload: Mapping[mbi.Clique, float] | Iterable[mbi.Clique] | None = None @@ -68,12 +72,132 @@ class SWIFTConfig: pgm_iters: int = 25_000 select_budget_frac: float = 0.1 one_way_budget_frac: float = 0.1 - seed: int | None = None + gdp_sigma: float | None = None + + def calibrate(self, *, zcdp_rho: float) -> SWIFTMechanism: + """Returns a copy calibrated to the given zCDP budget.""" + return dataclasses.replace( + self, gdp_sigma=accounting.zcdp_gaussian_sigma(zcdp_rho) + ) - def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: + @property + def dp_event(self) -> dp_accounting.DpEvent: """Returns the DP event for the SWIFT mechanism.""" - sigma = accounting.zcdp_gaussian_sigma(zcdp_rho) - return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + return dp_accounting.GaussianDpEvent(noise_multiplier=self.gdp_sigma) + + def __call__( + self, + rng: np.random.Generator, + data: mbi.Projectable, + *, + initial_measurements: Sequence[mbi.LinearMeasurement] | None = None, + initial_potentials: mbi.CliqueVector | None = None, + ) -> mbi.MarkovRandomField: + """Runs the SWIFT mechanism on the given data. + + Args: + rng: A numpy random number generator. + data: The sensitive dataset. + initial_measurements: Optional pre-existing one-way measurements. + initial_potentials: Optional initial potentials for constrained + estimation. + + Returns: + A fitted MarkovRandomField model. + + Raises: + ValueError: If calibrate() has not been called. + """ + if self.gdp_sigma is None: + raise ValueError('Must call calibrate() before using the mechanism.') + + logging.info('[SWIFT] Starting Mechanism.') + constraints = initial_potentials is not None + marginal_oracle = common.default_oracle(self.marginal_oracle, constraints) + + ######################################################################### + # Compile workload into candidate measurements, and precompute answers. # + ######################################################################### + candidates = common.compiled_workload( + data.domain, self.workload, self.max_marginal_size + ) + answers = mbi.CliqueVector.from_projectable(data, candidates) + logging.info('[SWIFT] Calculated workload-query answers.') + + # Convert end-to-end GDP sigma to budget for internal allocation. + gdp_budget = 1.0 / self.gdp_sigma**2 + budget_remaining = gdp_budget + domain = data.domain + if initial_measurements is None: + budget_oneway = self.one_way_budget_frac * gdp_budget + sigma_oneway = accounting.gdp_gaussian_sigma(budget_oneway) + budget_remaining -= budget_oneway + # measure_marginals_with_noise splits sigma_oneway across queries. + measurements = common.measure_marginals_with_noise( + rng, data, [(a,) for a in domain], gdp_sigma=sigma_oneway + ) + else: + measurements = list(initial_measurements) + + potentials = initial_potentials + if potentials is not None: + potentials = potentials.expand([m.clique for m in measurements]) + + model = mbi.estimation.mirror_descent( + domain, + measurements, + iters=self.pgm_iters, + potentials=potentials, + marginal_oracle=marginal_oracle, + ) + logging.info('[SWIFT] Estimated initial model.') + + ########################################### + # Select subset of candidates to measure. # + ########################################### + assert 0 < self.select_budget_frac < 1 + l1_error_budget = self.select_budget_frac * gdp_budget + budget_remaining -= l1_error_budget + + errors = _compute_initial_errors( + rng, answers, model, list(candidates), l1_error_budget + ) + logging.info('[SWIFT] Computed initial errors.') + + selected, jtree = select_queries( + errors, candidates, domain, self.max_clique_size, budget_remaining + ) + + ########################################## + # Measure the selected marginal queries. # + ########################################## + new_measurements, _ = _measure_selected_marginals( + rng, answers, selected, budget_remaining + ) + measurements.extend(new_measurements) + + ######################################################## + # Estimate the model using all measurements # + ######################################################## + + closed_oracle = functools.partial( + mbi.marginal_oracles.message_passing_stable, jtree=jtree + ) + + callback_fn = mbi.callbacks.default(measurements) + model = mbi.estimation.mirror_descent( + domain, + measurements, + iters=self.pgm_iters, + potentials=potentials, + marginal_oracle=closed_oracle, + callback_fn=callback_fn, + ) + logging.info('[SWIFT] Estimated final model.') + + return model def _is_supported(clique: mbi.Clique, tree: nx.Graph) -> bool: @@ -160,6 +284,7 @@ def build_best_clique_tree( def _compute_initial_errors( + rng: np.random.Generator, data: mbi.Projectable, model: mbi.MarkovRandomField, cliques: Sequence[mbi.Clique], @@ -177,7 +302,7 @@ def _compute_initial_errors( actual = data.project(cl) diff = (total * estimate - actual).datavector() error = np.abs(diff).sum() - errors[cl] = error + np.random.normal(loc=0.0, scale=sigma_per_clique) + errors[cl] = error + rng.normal(loc=0.0, scale=sigma_per_clique) return errors @@ -231,6 +356,7 @@ def select_queries( def _measure_selected_marginals( + rng: np.random.Generator, data: mbi.Projectable, selected: dict[mbi.Clique, float], budget_remaining: float, @@ -241,7 +367,7 @@ def _measure_selected_marginals( budget_remaining -= selected[cl] sigma = accounting.gdp_gaussian_sigma(selected[cl]) x = data.project(cl).datavector() - y = x + np.random.normal(loc=0.0, scale=sigma, size=x.size) + y = x + rng.normal(loc=0.0, scale=sigma, size=x.size) measurements.append(mbi.LinearMeasurement(y, cl, sigma)) logging.info('[SWIFT] Measured %s with sigma %f', cl, sigma) @@ -250,99 +376,3 @@ def _measure_selected_marginals( logging.info('[SWIFT] Selected %d marginals.', len(selected)) return measurements, budget_remaining - - -def run_mechanism( - data: mbi.Projectable, - config: SWIFTConfig, - zcdp_rho: float, - *, - initial_measurements: Sequence[mbi.LinearMeasurement] | None = None, - initial_potentials: mbi.CliqueVector | None = None, -) -> mbi.MarkovRandomField: - """Runs the SWIFT mechanism on the given data.""" - logging.info('[SWIFT] Starting Mechanism.') - constraints = initial_potentials is not None - marginal_oracle = common.default_oracle(config.marginal_oracle, constraints) - gdp_budget = accounting.zcdp_to_gdp(zcdp_rho) - - np.random.seed(config.seed) - - ######################################################################### - # Compile workload into candidate measurements, and precompute answers. # - ######################################################################### - candidates = common.compiled_workload( - data.domain, config.workload, config.max_marginal_size - ) - answers = mbi.CliqueVector.from_projectable(data, candidates) - logging.info('[SWIFT] Calculated workload-query answers.') - - budget_remaining = gdp_budget - domain = data.domain - if initial_measurements is None: - budget_oneway = config.one_way_budget_frac * gdp_budget - sigma_oneway = accounting.gdp_gaussian_sigma(budget_oneway) - budget_remaining -= budget_oneway - measurements = common.measure_marginals_with_noise( - data, [(a,) for a in domain], gdp_sigma=sigma_oneway - ) - else: - measurements = list(initial_measurements) - - potentials = initial_potentials - if potentials is not None: - potentials = potentials.expand([m.clique for m in measurements]) - - model = mbi.estimation.mirror_descent( - domain, - measurements, - iters=config.pgm_iters, - potentials=potentials, - marginal_oracle=marginal_oracle, - ) - logging.info('[SWIFT] Estimated initial model.') - - ########################################### - # Select subset of candidates to measure. # - ########################################### - assert 0 < config.select_budget_frac < 1 - l1_error_budget = config.select_budget_frac * gdp_budget - budget_remaining -= l1_error_budget - - errors = _compute_initial_errors( - answers, model, list(candidates), l1_error_budget - ) - logging.info('[SWIFT] Computed initial errors.') - - selected, jtree = select_queries( - errors, candidates, domain, config.max_clique_size, budget_remaining - ) - - ########################################## - # Measure the selected marginal queries. # - ########################################## - new_measurements, _ = _measure_selected_marginals( - answers, selected, budget_remaining - ) - measurements.extend(new_measurements) - - ######################################################## - # Estimate the model using all measurements # - ######################################################## - - closed_oracle = functools.partial( - mbi.marginal_oracles.message_passing_stable, jtree=jtree - ) - - callback_fn = mbi.callbacks.default(measurements) - model = mbi.estimation.mirror_descent( - domain, - measurements, - iters=config.pgm_iters, - potentials=potentials, - marginal_oracle=closed_oracle, - callback_fn=callback_fn, - ) - logging.info('[SWIFT] Estimated final model.') - - return model diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 49b23c4..9dec10f 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -85,12 +85,15 @@ def dp_event(self) -> dp_accounting.DpEvent: """The DpEvent characterizing the privacy cost of this mechanism.""" @abc.abstractmethod - def __call__(self, rng: Any, data: Any) -> Any: + def __call__(self, *args: Any, **kwargs: Any) -> Any: """Runs the mechanism on the given data. + Subclass signatures vary, but typically accept at least the data to operate + on and a source of randomness. + Args: - rng: A source of randomness (e.g., ``np.random.Generator``). - data: The input data to the mechanism. + *args: Positional arguments (subclass-specific). + **kwargs: Keyword arguments (subclass-specific). """ diff --git a/dpsynth/pipeline_transformations/aim.py b/dpsynth/pipeline_transformations/aim.py index 5f03fb4..eb949f7 100644 --- a/dpsynth/pipeline_transformations/aim.py +++ b/dpsynth/pipeline_transformations/aim.py @@ -223,7 +223,9 @@ def filter_fn(m: tuple[Clique, np.ndarray], model, selected_marginals): worst_approximated = backend.map( errors_singleton, - lambda x: _select_worst_approximated(x, exponential_spec), + lambda x: _select_worst_approximated( + np.random.default_rng(), x, exponential_spec + ), 'Get worst approximated', ) # singleton (Clique,) @@ -291,6 +293,7 @@ def _compute_error( def _select_worst_approximated( + rng: np.random.Generator, clique_errors: list[tuple[Clique, float]], exponential_spec: pipeline_dp.budget_accounting.MechanismSpec, ) -> Clique: @@ -298,7 +301,7 @@ def _select_worst_approximated( errors = np.array([x[1] for x in clique_errors]) exponential_eps = np.sqrt(2) / exponential_spec.noise_standard_deviation idx = common.exponential_mechanism( - errors, exponential_eps, sensitivity=1.0, monotonic=True + errors, exponential_eps, sensitivity=1.0, rng=rng, monotonic=True ) return clique_errors[idx][0] diff --git a/dpsynth/pipeline_transformations/mst.py b/dpsynth/pipeline_transformations/mst.py index de4a7d5..3518748 100644 --- a/dpsynth/pipeline_transformations/mst.py +++ b/dpsynth/pipeline_transformations/mst.py @@ -157,6 +157,7 @@ def get_mst_fn(weights: dict[Edge, float]) -> list[Edge]: epsilon = _get_eps_from_laplace_noise_std(budget.noise_standard_deviation) spanning_tree = mst_mechanism.dp_maximum_spanning_tree( + np.random.default_rng(), weights_str, exponential_mechanism_epsilon=epsilon, ) diff --git a/dpsynth/pydantic_api.py b/dpsynth/pydantic_api.py index d4d7eb4..fbef5bf 100644 --- a/dpsynth/pydantic_api.py +++ b/dpsynth/pydantic_api.py @@ -146,7 +146,7 @@ def dp_synthetic_data_generation( epsilon: float, delta: float, *, - mechanism_config: discrete_mechanisms.DiscreteMechanismConfig = discrete_mechanisms.MSTConfig(), + mechanism_config: discrete_mechanisms.DiscreteMechanism = discrete_mechanisms.MSTMechanism(), ) -> list[RecordT]: """Generate synthetic data for a collection of pydantic Models. diff --git a/tests/discrete_mechanisms/aim_test.py b/tests/discrete_mechanisms/aim_test.py index 93fbf2c..4924a9a 100644 --- a/tests/discrete_mechanisms/aim_test.py +++ b/tests/discrete_mechanisms/aim_test.py @@ -24,9 +24,10 @@ class AIMTest(absltest.TestCase): def test_fits_one_way_marginals_with_aim(self): data = mbi.Dataset.synthetic(mbi.Domain(["a", "b", "c"], [3, 4, 5]), N=1000) workload = [("a",), ("b",), ("c",)] - config = aim.AIMConfig(workload=workload, max_rounds=4, pgm_iters=500) + config = aim.AIMMechanism(workload=workload, max_rounds=4, pgm_iters=500) - synthetic = aim.run_mechanism(data, config, zcdp_rho=10000) + calibrated = config.calibrate(zcdp_rho=10000) + synthetic = calibrated(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() @@ -37,16 +38,33 @@ def test_fits_one_way_marginals_with_aim_gdp(self): data = mbi.Dataset.synthetic(mbi.Domain(["a", "b", "c"], [3, 4, 5]), N=1000) workload = [("a",), ("b",), ("c",)] - config = aim_gdp.AIMGDPConfig( + config = aim_gdp.AIMGDPMechanism( workload=workload, max_rounds=4, pgm_iters=500 ) - synthetic = aim_gdp.run_mechanism(data, config, zcdp_rho=10000) + calibrated = config.calibrate(zcdp_rho=10000) + synthetic = calibrated(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() actual = synthetic.project([col]).datavector() np.testing.assert_allclose(actual, expected, atol=1) + def test_uncalibrated_aim_raises(self): + config = aim.AIMMechanism() + with self.assertRaisesRegex(ValueError, "calibrate"): + _ = config.dp_event + data = mbi.Dataset.synthetic(mbi.Domain(["a"], [3]), N=10) + with self.assertRaisesRegex(ValueError, "calibrate"): + config(np.random.default_rng(0), data) + + def test_uncalibrated_aim_gdp_raises(self): + config = aim_gdp.AIMGDPMechanism() + with self.assertRaisesRegex(ValueError, "calibrate"): + _ = config.dp_event + data = mbi.Dataset.synthetic(mbi.Domain(["a"], [3]), N=10) + with self.assertRaisesRegex(ValueError, "calibrate"): + config(np.random.default_rng(0), data) + if __name__ == "__main__": absltest.main() diff --git a/tests/discrete_mechanisms/common_test.py b/tests/discrete_mechanisms/common_test.py index be90f08..64f765a 100644 --- a/tests/discrete_mechanisms/common_test.py +++ b/tests/discrete_mechanisms/common_test.py @@ -28,19 +28,26 @@ def assert_serializable(obj): class CommonTest(absltest.TestCase): def test_exponential_mechanism(self): + rng = np.random.default_rng(0) scores = np.array([5, 20, -10, 3]) - idx = common.exponential_mechanism(scores, epsilon=1.0, sensitivity=1.0) + idx = common.exponential_mechanism( + scores, epsilon=1.0, sensitivity=1.0, rng=rng + ) self.assertIn(idx, [0, 1, 2, 3]) - idx = common.exponential_mechanism(scores, epsilon=1.0, sensitivity=1e-8) + idx = common.exponential_mechanism( + scores, epsilon=1.0, sensitivity=1e-8, rng=rng + ) self.assertEqual(idx, 1) - idx = common.exponential_mechanism(scores, epsilon=1e8, sensitivity=1.0) + idx = common.exponential_mechanism( + scores, epsilon=1e8, sensitivity=1.0, rng=rng + ) self.assertEqual(idx, 1) def test_measure_marginals_with_noise(self): data = mbi.Dataset.synthetic(mbi.Domain(["a", "b", "c"], [3, 4, 5]), N=1000) marginal_queries = [("a",), ("b",), ("c",)] measurements = common.measure_marginals_with_noise( - data, marginal_queries, gdp_sigma=1.0 + np.random.default_rng(0), data, marginal_queries, gdp_sigma=1.0 ) self.assertLen(measurements, 3) for m in measurements: diff --git a/tests/discrete_mechanisms/direct_test.py b/tests/discrete_mechanisms/direct_test.py index 6b43b0d..c3a65e6 100644 --- a/tests/discrete_mechanisms/direct_test.py +++ b/tests/discrete_mechanisms/direct_test.py @@ -23,7 +23,7 @@ class DirectTest(absltest.TestCase): def test_fits_one_way_marginals(self): data = mbi.Dataset.synthetic(mbi.Domain(['a', 'b', 'c'], [3, 4, 5]), N=1000) - config = direct.DirectConfig( + config = direct.DirectMechanism( prespecified_marginal_queries=[ ('a', 'b'), ('a', 'c'), @@ -31,7 +31,7 @@ def test_fits_one_way_marginals(self): ], pgm_iters=500, ) - synthetic = direct.run_mechanism(data, config, zcdp_rho=10000) + synthetic = config.calibrate(zcdp_rho=10000)(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() diff --git a/tests/discrete_mechanisms/independent_test.py b/tests/discrete_mechanisms/independent_test.py index 88c58da..e3668b7 100644 --- a/tests/discrete_mechanisms/independent_test.py +++ b/tests/discrete_mechanisms/independent_test.py @@ -23,8 +23,8 @@ class IndependentTest(absltest.TestCase): def test_fits_one_way_marginals(self): data = mbi.Dataset.synthetic(mbi.Domain(["a", "b", "c"], [3, 4, 5]), N=1000) - config = independent.IndependentConfig(pgm_iters=500) - synthetic = independent.run_mechanism(data, config, zcdp_rho=10000) + config = independent.IndependentMechanism(pgm_iters=500) + synthetic = config.calibrate(zcdp_rho=10000)(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() diff --git a/tests/discrete_mechanisms/mst_test.py b/tests/discrete_mechanisms/mst_test.py index 1679826..7f607f2 100644 --- a/tests/discrete_mechanisms/mst_test.py +++ b/tests/discrete_mechanisms/mst_test.py @@ -13,6 +13,7 @@ # limitations under the License. from absl.testing import absltest +import dp_accounting from dpsynth.discrete_mechanisms import mst import mbi import numpy as np @@ -36,7 +37,9 @@ def test_dp_maximum_spanning_tree_infinite_rho(self): frozenset({'A', 'D'}), } - actual_edges_list = mst.dp_maximum_spanning_tree(weights, zcdp_rho=100) + actual_edges_list = mst.dp_maximum_spanning_tree( + np.random.default_rng(0), weights, zcdp_rho=100 + ) actual_mst_edges = {frozenset(edge) for edge in actual_edges_list} @@ -59,7 +62,7 @@ def test_dp_maximum_spanning_tree_infinite_eps(self): } actual_edges_list = mst.dp_maximum_spanning_tree( - weights, exponential_mechanism_epsilon=100 + np.random.default_rng(0), weights, exponential_mechanism_epsilon=100 ) actual_mst_edges = {frozenset(edge) for edge in actual_edges_list} @@ -69,15 +72,31 @@ def test_dp_maximum_spanning_tree_infinite_eps(self): def test_fits_one_way_marginals(self): data = mbi.Dataset.synthetic(mbi.Domain(['a', 'b', 'c'], [3, 4, 5]), N=1000) - config = mst.MSTConfig(pgm_iters=500) + config = mst.MSTMechanism(pgm_iters=500).calibrate(zcdp_rho=10000) - synthetic = mst.run_mechanism(data, config, zcdp_rho=10000) + synthetic = config(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() actual = synthetic.project([col]).datavector() np.testing.assert_allclose(actual, expected, atol=1) + def test_calibrate_required(self): + config = mst.MSTMechanism() + data = mbi.Dataset.synthetic(mbi.Domain(['a', 'b'], [3, 4]), N=100) + with self.assertRaises(ValueError): + config(np.random.default_rng(0), data) + + def test_dp_event_requires_calibration(self): + config = mst.MSTMechanism() + with self.assertRaises(ValueError): + _ = config.dp_event + + def test_dp_event_returns_zcdp(self): + config = mst.MSTMechanism().calibrate(zcdp_rho=1.0) + event = config.dp_event + self.assertIsInstance(event, dp_accounting.ZCDpEvent) + if __name__ == '__main__': absltest.main() diff --git a/tests/discrete_mechanisms/swift_test.py b/tests/discrete_mechanisms/swift_test.py index 1a2f12b..2dd67f9 100644 --- a/tests/discrete_mechanisms/swift_test.py +++ b/tests/discrete_mechanisms/swift_test.py @@ -15,6 +15,7 @@ import itertools from absl.testing import absltest +import dp_accounting from dpsynth.discrete_mechanisms import clique_tree from dpsynth.discrete_mechanisms import swift from dpsynth.discrete_mechanisms import swift_utils @@ -121,15 +122,31 @@ def test_build_clique_tree(self): def test_fits_one_way_marginals(self): data = mbi.Dataset.synthetic(mbi.Domain(['a', 'b', 'c'], [3, 4, 5]), N=1000) - config = swift.SWIFTConfig(pgm_iters=500) + config = swift.SWIFTMechanism(pgm_iters=500).calibrate(zcdp_rho=10000) - synthetic = swift.run_mechanism(data, config, zcdp_rho=10000) + synthetic = config(np.random.default_rng(0), data) for col in data.domain: expected = data.project([col]).datavector() actual = synthetic.project([col]).datavector() np.testing.assert_allclose(actual, expected, atol=1) + def test_calibrate_required(self): + config = swift.SWIFTMechanism() + data = mbi.Dataset.synthetic(mbi.Domain(['a', 'b'], [3, 4]), N=100) + with self.assertRaises(ValueError): + config(np.random.default_rng(0), data) + + def test_dp_event_requires_calibration(self): + config = swift.SWIFTMechanism() + with self.assertRaises(ValueError): + _ = config.dp_event + + def test_dp_event_returns_gaussian(self): + config = swift.SWIFTMechanism().calibrate(zcdp_rho=1.0) + event = config.dp_event + self.assertIsInstance(event, dp_accounting.GaussianDpEvent) + if __name__ == '__main__': absltest.main() diff --git a/tests/pipeline_transformations/mst_test.py b/tests/pipeline_transformations/mst_test.py index 21d2aef..b026f36 100644 --- a/tests/pipeline_transformations/mst_test.py +++ b/tests/pipeline_transformations/mst_test.py @@ -163,10 +163,11 @@ def test_select_dp_maximum_spanning_tree_check_epsilon( mock_dp_mst.assert_called_once() args, kwargs = mock_dp_mst.call_args - # Check positional arguments (weights) - self.assertLen(args, 1) + # Check positional arguments (rng, weights) + self.assertLen(args, 2) + self.assertIsInstance(args[0], np.random.Generator) self.assertEqual( - args[0], {("0", "1"): 10.0, ("0", "2"): 5.0, ("1", "2"): 0.0} + args[1], {("0", "1"): 10.0, ("0", "2"): 5.0, ("1", "2"): 0.0} ) # Check keyword arguments (epsilon)