Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Linf contribution bounder analysis #532

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 88 additions & 28 deletions analysis/contribution_bounders.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
import numpy as np
from pipeline_dp import contribution_bounders
from pipeline_dp import sampling_utils
from typing import Iterable
from typing import Iterable, Tuple, Union


class AnalysisContributionBounder(contribution_bounders.ContributionBounder):
class L0LinfAnalysisContributionBounder(
contribution_bounders.ContributionBounder):
"""'Bounds' the contribution by privacy_id per and cross partitions.

Because this is for Utility Analysis, it doesn't actually ensure that
contribution bounds are enforced. Instead, it keeps track of probabilities
different data points are kept under per-partition (Linf) and
cross-partition (L0) contribution bounding.
contribution bounds are enforced. Instead, it keeps track the information
needed for computing impact of contribution bounding.

If partitions_sampling_prob < 1.0, partitions are subsampled. This sampling
is deterministic and depends on partition key.

Only works for count at the moment.
"""

def __init__(self, partitions_sampling_prob: float):
Expand All @@ -43,7 +41,7 @@ def bound_contributions(self, col, params, backend, report_generator,

col = backend.map_tuple(
col, lambda pid, pk, v: (pid, (pk, v)),
"Rekey to ((privacy_id), (partition_key, value))")
"Rekey to (privacy_id, (partition_key, value))")
col = backend.group_by_key(
col, "Group by key to get (privacy_id, [(partition_key, value)])")
# (privacy_id, [(partition_key, value)])
Expand All @@ -68,26 +66,7 @@ def rekey_per_privacy_id_per_partition_key_and_unnest(pid_pk_v_values):
for partition_key, values in partition_values:
if sampler is not None and not sampler.keep(partition_key):
continue
# Sum values.
# values can contain multi-columns, the format is the following
# 1 column:
# input: values = [v_0:float, ... ]
# output: v_0 + ....
# k columns (k > 1):
# input: values = [v_0=(v_00, ... v_0(k-1)), ...]
# output: (v_00+v_10+..., ...)
if not values:
# Empty public partitions
sum_values = 0
elif len(values) == 1:
# No need to sum, return 0th value
sum_values = values[0]
elif not isinstance(values[0], Iterable):
# 1 column
sum_values = sum(values)
else:
# multiple value columns, sum each column independently
sum_values = tuple(np.array(values).sum(axis=0).tolist())
sum_values = _sum_values(values)

yield (privacy_id, partition_key), (
len(values),
Expand All @@ -104,6 +83,87 @@ def rekey_per_privacy_id_per_partition_key_and_unnest(pid_pk_v_values):
return backend.map_values(col, aggregate_fn, "Apply aggregate_fn")


class LinfAnalysisContributionBounder(contribution_bounders.ContributionBounder
):
"""'Bounds' the contribution by privacy_id per partitions.

Because this is for Utility Analysis, it doesn't actually ensure that
contribution bounds are enforced. Instead, it keeps track the information
needed for computing impact of contribution bounding.

If partitions_sampling_prob < 1.0, partitions are subsampled. This sampling
is deterministic and depends on partition key.
"""

def __init__(self, partitions_sampling_prob: float):
super().__init__()
self._sampling_probability = partitions_sampling_prob

def bound_contributions(self, col, params, backend, report_generator,
aggregate_fn):
"""See docstrings for this class and the base class."""
# Sample by partition key if sampling_prob < 1.
if self._sampling_probability < 1:
sampler = sampling_utils.ValueSampler(self._sampling_probability)
col = backend.filter(col,
lambda pid_pk_v: sampler.keep(pid_pk_v[1]),
"Sample partitions")

col = backend.map_tuple(
col, lambda pid, pk, v: ((pid, pk), v),
"Rekey to ((privacy_id, partition_key), value)")
col = backend.group_by_key(
col, "Group by key to get ((privacy_id, partition_key), [value])")

# ((privacy_id, partition_key), [value])

# Rekey by (privacy_id, partition_key) and unnest values along with the
# number of partitions contributed per privacy_id.
def rekey_per_privacy_id_per_partition_key_and_unnest(pid_pk_v_values):
(privacy_id, partition_key), values = pid_pk_v_values
num_partitions_contributed = 1
num_contributions = len(values)
sum_values = _sum_values(values)
yield (privacy_id, partition_key), (
len(values),
sum_values,
num_partitions_contributed,
num_contributions,
)

# Unnest the list per privacy id and per partition key.
col = backend.flat_map(
col, rekey_per_privacy_id_per_partition_key_and_unnest,
"Unnest per-privacy_id")

return backend.map_values(col, aggregate_fn, "Apply aggregate_fn")


def _sum_values(
values: Union[Iterable[float], Iterable[Iterable[float]]]
) -> Union[float, Tuple[float]]:
"""Sums values"""
# Sum values.
# values can contain multi-columns, the format is the following
# 1 column:
# input: values = [v_0:float, ... ]
# output: v_0 + ....
# k columns (k > 1):
# input: values = [v_0=(v_00, ... v_0(k-1)), ...]
# output: (v_00+v_10+..., ...)
if not values:
# Empty public partitions
return 0
if len(values) == 1:
# No need to sum, return 0th value
return values[0]
if not isinstance(values[0], Iterable):
# 1 column
return sum(values)
# multiple value columns, sum each column independently
return tuple(np.array(values).sum(axis=0).tolist())


class NoOpContributionBounder(contribution_bounders.ContributionBounder):

def bound_contributions(self, col, params, backend, report_generator,
Expand Down
2 changes: 1 addition & 1 deletion analysis/per_partition_combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ def create_accumulator(
self, sparse_acc: Tuple[np.ndarray, np.ndarray,
np.ndarray]) -> AccumulatorType:
count, _sum, n_partitions = sparse_acc
return len(count), np.sum(count).item()
return len(count), int(np.sum(count).item())

def compute_metrics(self, acc: AccumulatorType):
privacy_id_count, count = acc
Expand Down
2 changes: 1 addition & 1 deletion analysis/pre_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def preaggregate(col,
data_extractors.value_extractor(row)),
"Extract (privacy_id, partition_key, value))")
# col: (privacy_id, partition_key, value):
bounder = utility_contribution_bounders.AnalysisContributionBounder(
bounder = utility_contribution_bounders.L0LinfAnalysisContributionBounder(
partitions_sampling_prob)
col = bounder.bound_contributions(col,
params=None,
Expand Down
130 changes: 98 additions & 32 deletions analysis/tests/contribution_bounders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,16 @@ def _create_report_generator():
return pipeline_dp.report_generator.ReportGenerator(None, "test")


class SamplingL0LinfContributionBounderTest(parameterized.TestCase):
class L0LinfContributionBounderTest(parameterized.TestCase):

def _run_contribution_bounding(self,
input,
max_partitions_contributed,
max_contributions_per_partition,
partitions_sampling_prob: float = 1.0,
aggregate_fn=count_aggregate_fn):
params = CrossAndPerPartitionContributionParams(
max_partitions_contributed, max_contributions_per_partition)

bounder = contribution_bounders.AnalysisContributionBounder(
bounder = contribution_bounders.L0LinfAnalysisContributionBounder(
partitions_sampling_prob)
return list(
bounder.bound_contributions(input, params,
pipeline_dp.LocalBackend(),
bounder.bound_contributions(input, None, pipeline_dp.LocalBackend(),
_create_report_generator(),
aggregate_fn))

Expand All @@ -64,19 +58,15 @@ def test_contribution_bounding_empty_col(self):
def test_contribution_bounding_bound_input_nothing_dropped(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4)]
max_partitions_contributed = max_contributions_per_partition = 2
bound_result = self._run_contribution_bounding(
input, max_partitions_contributed, max_contributions_per_partition)
bound_result = self._run_contribution_bounding(input)

expected_result = [(('pid1', 'pk2'), 2), (('pid1', 'pk1'), 2)]
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_per_partition_bounding_not_applied(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid2", 'pk2', 6)]
max_partitions_contributed, max_contributions_per_partition = 5, 2
bound_result = self._run_contribution_bounding(
input, max_partitions_contributed, max_contributions_per_partition)
bound_result = self._run_contribution_bounding(input)

expected_result = [(('pid1', 'pk1'), 2), (('pid1', 'pk2'), 3),
(('pid2', 'pk2'), 1)]
Expand All @@ -87,12 +77,98 @@ def test_contribution_bounding_cross_partition_bounding_not_applied(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid1", 'pk3', 6),
("pid1", 'pk4', 7), ("pid2", 'pk4', 8)]
max_partitions_contributed = 3
max_contributions_per_partition = 5
bound_result = self._run_contribution_bounding(input)

expected_result = [(('pid1', 'pk1'), 2), (('pid1', 'pk2'), 3),
(('pid1', 'pk3'), 1), (('pid1', 'pk4'), 1),
(('pid2', 'pk4'), 1)]
# Check per- and cross-partition contribution limits are not enforced.
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_cross_partition_bounding_and_sampling(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid1", 'pk3', 6),
("pid1", 'pk4', 7), ("pid2", 'pk4', 8)]
bound_result = self._run_contribution_bounding(
input, partitions_sampling_prob=0.7)

# 'pk1' and 'pk2' are dropped by subsampling.
expected_result = [(('pid2', 'pk4'), 1), (('pid1', 'pk3'), 1),
(('pid1', 'pk4'), 1)]
# Check per- and cross-partition contribution limits are not enforced.
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_cross_partition_bounding_and_2_column_values(
self):
input = [("pid1", 'pk1', (1, 2)), ("pid1", 'pk1', (3, 4)),
("pid1", 'pk2', (-1, 0)), ("pid2", 'pk1', (5, 5))]

bound_result = self._run_contribution_bounding(input,
aggregate_fn=lambda x: x)

expected_result = [(('pid1', 'pk2'), (1, (-1, 0), 2, 3)),
(('pid1', 'pk1'), (2, (4, 6), 2, 3)),
(('pid2', 'pk1'), (1, (5, 5), 1, 1))]
# Check per- and cross-partition contribution limits are not enforced.
self.assertEqual(set(expected_result), set(bound_result))


class LinfContributionBounderTest(parameterized.TestCase):

def _run_contribution_bounding(self,
input,
partitions_sampling_prob: float = 1.0,
aggregate_fn=count_aggregate_fn):
bounder = contribution_bounders.LinfAnalysisContributionBounder(
partitions_sampling_prob)
return list(
bounder.bound_contributions(input, None, pipeline_dp.LocalBackend(),
_create_report_generator(),
aggregate_fn))

def test_contribution_bounding_empty_col(self):
input = []
max_partitions_contributed = 2
max_contributions_per_partition = 2
bound_result = self._run_contribution_bounding(
input, max_partitions_contributed, max_contributions_per_partition)

self.assertEmpty(bound_result)

def test_contribution_bounding_bound_input_nothing_dropped(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4)]
bound_result = self._run_contribution_bounding(input,
aggregate_fn=lambda x: x)
# the output format:
# (count_per_partition, sum_per_partition,
# num_partition_contributed_per_privacy_id,
# num_contribution_per_privacy_id)
# Since no cross-partition contribution, we consider that the privacy id
# contributes only to this partition, so
# num_partition_contributed_per_privacy_id = 1
# num_contribution_per_privacy_id = count_per_partition
expected_result = [(('pid1', 'pk2'), (2, 7, 1, 2)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why (2, 7, 1, 2) if pk2 has only 3 and 4 values contributed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment

 # the output format: 
        # (count_per_partition, sum_per_partition, 
        # num_partition_contributed_per_privacy_id, 
        # num_contribution_per_privacy_id)
        # Since no cross-partition contribution, we consider that the privacy id
        # contributes only to this partition, so
        # num_partition_contributed_per_privacy_id = 1
        # num_contribution_per_privacy_id = count_per_partition

(('pid1', 'pk1'), (2, 3, 1, 2))]
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_per_partition_bounding_not_applied(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid2", 'pk2', 6)]
bound_result = self._run_contribution_bounding(input)

expected_result = [(('pid1', 'pk1'), 2), (('pid1', 'pk2'), 3),
(('pid2', 'pk2'), 1)]
# Check per-partition contribution limits are not enforced.
self.assertEqual(set(expected_result), set(bound_result))

def test_contribution_bounding_cross_partition_bounding_not_applied(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid1", 'pk3', 6),
("pid1", 'pk4', 7), ("pid2", 'pk4', 8)]

bound_result = self._run_contribution_bounding(input)

expected_result = [(('pid1', 'pk1'), 2), (('pid1', 'pk2'), 3),
(('pid1', 'pk3'), 1), (('pid1', 'pk4'), 1),
(('pid2', 'pk4'), 1)]
Expand All @@ -103,14 +179,9 @@ def test_contribution_bounding_cross_partition_bounding_and_sampling(self):
input = [("pid1", 'pk1', 1), ("pid1", 'pk1', 2), ("pid1", 'pk2', 3),
("pid1", 'pk2', 4), ("pid1", 'pk2', 5), ("pid1", 'pk3', 6),
("pid1", 'pk4', 7), ("pid2", 'pk4', 8)]
max_partitions_contributed = 3
max_contributions_per_partition = 5

bound_result = self._run_contribution_bounding(
input,
max_partitions_contributed,
max_contributions_per_partition,
partitions_sampling_prob=0.7)
input, partitions_sampling_prob=0.7)

# 'pk1' and 'pk2' are dropped by subsampling.
expected_result = [(('pid2', 'pk4'), 1), (('pid1', 'pk3'), 1),
Expand All @@ -122,17 +193,12 @@ def test_contribution_bounding_cross_partition_bounding_and_2_column_values(
self):
input = [("pid1", 'pk1', (1, 2)), ("pid1", 'pk1', (3, 4)),
("pid1", 'pk2', (-1, 0)), ("pid2", 'pk1', (5, 5))]
max_partitions_contributed = 3
max_contributions_per_partition = 5

bound_result = self._run_contribution_bounding(
input,
max_partitions_contributed,
max_contributions_per_partition,
aggregate_fn=lambda x: x)
bound_result = self._run_contribution_bounding(input,
aggregate_fn=lambda x: x)

expected_result = [(('pid1', 'pk2'), (1, (-1, 0), 2, 3)),
(('pid1', 'pk1'), (2, (4, 6), 2, 3)),
expected_result = [(('pid1', 'pk2'), (1, (-1, 0), 1, 1)),
(('pid1', 'pk1'), (2, (4, 6), 1, 2)),
(('pid2', 'pk1'), (1, (5, 5), 1, 1))]
# Check per- and cross-partition contribution limits are not enforced.
self.assertEqual(set(expected_result), set(bound_result))
Expand Down
Loading
Loading