Skip to content

Commit

Permalink
Implemented DPQuantileAggregator::AggregateTensor, which is either a …
Browse files Browse the repository at this point in the history
…simple push_back to the buffer_ or an instance of reservoir sampling. Also added GetBufferSize and InsertWithReservoirSampling member functions.

PiperOrigin-RevId: 687324144
  • Loading branch information
TensorFlow Federated Team authored and copybara-github committed Oct 18, 2024
1 parent c394e87 commit 956b6c7
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 7 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ and this project adheres to
* The abstract class `DPTensorAggregator` and the child `DPQuantileAggregator`
(along with the factory class). `DPQuantileAggregator` is currently a
skeleton; future CLs will implement the member functions.
* `DPQuantileAggregator::AggregateTensors` performs either a `push_back` or
reservoir sampling, depending on size of member `buffer_`. The reservoir
sampling functionality is performed by `::InsertWithReservoirSampling`.

### Removed

Expand Down
1 change: 1 addition & 0 deletions tensorflow_federated/cc/core/impl/aggregation/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ cc_library(
":tensor",
":tensor_cc_proto",
"//tensorflow_federated/cc/core/impl/aggregation/base",
"@com_google_absl//absl/random",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <vector>

#include "absl/random/random.h"
#include "tensorflow_federated/cc/core/impl/aggregation/base/monitoring.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/agg_core.pb.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/datatype.h"
Expand All @@ -35,6 +36,14 @@
namespace tensorflow_federated {
namespace aggregation {

template <typename T>
inline void DPQuantileAggregator<T>::InsertWithReservoirSampling(T value) {
int index = absl::Uniform(bit_gen_, 0, num_inputs_);
if (index < buffer_.size()) {
buffer_[index] = value;
}
}

// To merge, we insert up to capacity and then perform reservoir sampling.
template <typename T>
Status DPQuantileAggregator<T>::MergeWith(TensorAggregator&& other) {
Expand All @@ -51,13 +60,53 @@ StatusOr<std::string> DPQuantileAggregator<T>::Serialize() && {
// Push back the input into the buffer or perform reservoir sampling.
template <typename T>
Status DPQuantileAggregator<T>::AggregateTensors(InputTensorList tensors) {
return TFF_STATUS(UNIMPLEMENTED) << "Will be implemented in a follow-up CL.";
TFF_RETURN_IF_ERROR(CheckValid());
// Ensure that there is exactly one tensor.
if (tensors.size() != 1) {
return TFF_STATUS(INVALID_ARGUMENT)
<< "DPQuantileAggregator::AggregateTensors: Expected exactly one "
"tensor, but got "
<< tensors.size();
}
// Ensure that the tensor only has one element.
auto num_elements_in_tensor = tensors[0]->num_elements();
if (num_elements_in_tensor != 1) {
return TFF_STATUS(INVALID_ARGUMENT)
<< "DPQuantileAggregator::AggregateTensors: Expected a scalar "
"tensor, but got a tensor with "
<< num_elements_in_tensor << " elements.";
}

// Ensure that the tensor is of the correct type.
DataType dtype = tensors[0]->dtype();
DataType expected_dtype = internal::TypeTraits<T>::kDataType;
if (dtype != expected_dtype) {
return TFF_STATUS(INVALID_ARGUMENT)
<< "DPQuantileAggregator::AggregateTensors: Expected a "
<< DataType_Name(expected_dtype) << " tensor, but got a "
<< DataType_Name(dtype) << " tensor.";
}

num_inputs_++;
T value = tensors[0]->CastToScalar<T>();
if (buffer_.size() < kDPQuantileMaxInputs) {
buffer_.push_back(value);
} else {
InsertWithReservoirSampling(value);
}

return TFF_STATUS(OK);
}

// Checks if the output has not already been consumed.
template <typename T>
Status DPQuantileAggregator<T>::CheckValid() const {
return TFF_STATUS(UNIMPLEMENTED) << "Will be implemented in a follow-up CL.";
if (output_consumed_) {
return TFF_STATUS(FAILED_PRECONDITION)
<< "DPQuantileAggregator::CheckValid: Output has already been "
"consumed.";
}
return TFF_STATUS(OK);
}

// Trigger execution of the DP quantile algorithm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <vector>

#include "absl/random/random.h"
#include "tensorflow_federated/cc/core/impl/aggregation/base/monitoring.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/agg_core.pb.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/dp_tensor_aggregator.h"
Expand Down Expand Up @@ -48,12 +49,14 @@ class DPQuantileAggregator final : public DPTensorAggregator {
: DPTensorAggregator(),
target_quantile_(target_quantile),
num_inputs_(0),
buffer_() {
output_consumed_(false) {
TFF_CHECK(target_quantile > 0 && target_quantile < 1)
<< "Target quantile must be in (0, 1).";
}

int GetNumInputs() const override { return num_inputs_; }
inline int GetNumInputs() const override { return num_inputs_; }

inline int GetBufferSize() const { return buffer_.size(); }

// To MergeWith another DPQuantileAggregator, we will insert as many elements
// from the other aggregator's buffer as possible into our buffer, without
Expand Down Expand Up @@ -81,9 +84,16 @@ class DPQuantileAggregator final : public DPTensorAggregator {
Status CheckValid() const override;

private:
// Implements Vitter's reservoir sampling algorithm.
// https://en.wikipedia.org/wiki/Reservoir_sampling#Simple:_Algorithm_R
// Called by AggregateTensors and MergeWith when buffer_ is full.
inline void InsertWithReservoirSampling(T value);

double target_quantile_;
int num_inputs_;
std::vector<T> buffer_;
absl::BitGen bit_gen_;
bool output_consumed_;
};

// This factory class makes DPQuantileAggregator, defined in the cc file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "googlemock/include/gmock/gmock.h"
#include "googletest/include/gtest/gtest.h"
#include "tensorflow_federated/cc/core/impl/aggregation/base/monitoring.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/datatype.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/dp_fedsql_constants.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/input_tensor_list.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/intrinsic.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/tensor.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/tensor.pb.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/tensor_aggregator.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/tensor_aggregator_registry.h"
#include "tensorflow_federated/cc/core/impl/aggregation/core/tensor_spec.h"
#include "tensorflow_federated/cc/core/impl/aggregation/testing/test_data.h"
Expand Down Expand Up @@ -189,12 +192,87 @@ TEST(DPQuantileAggregatorTest, WrongOutputType) {
HasSubstr("Output type must be double"));
}

// The second batch of tests is on aggregating and merging data.
// The second batch of tests is on aggregating data.

// The third batch of tests is on ReportWithEpsilonAndDelta: the DP quantile
// algorithm should produce an output that reasonably approximate the target
StatusOr<std::unique_ptr<TensorAggregator>> CreateDPQuantileAggregator(
DataType dtype, double target_quantile = 0.5) {
Intrinsic intrinsic = Intrinsic{kDPQuantileUri,
{CreateTensorSpec("value", dtype)},
{CreateTensorSpec("value", DT_DOUBLE)},
{CreateDPQuantileParameters(target_quantile)},
{}};
return CreateTensorAggregator(intrinsic);
}

// Cannot aggregate multiple tensors.
TEST(DPQuantileAggregatorTest, AggregateMultipleTensors) {
auto aggregator_status = CreateDPQuantileAggregator(DT_INT32);
TFF_EXPECT_OK(aggregator_status);
auto aggregator = std::move(aggregator_status.value());
Tensor t1 =
Tensor::Create(DT_INT32, {}, CreateTestData<int32_t>({1})).value();
Tensor t2 =
Tensor::Create(DT_INT32, {}, CreateTestData<int32_t>({2})).value();
auto accumulate_stauts = aggregator->Accumulate(InputTensorList({&t1, &t2}));
EXPECT_THAT(accumulate_stauts, StatusIs(INVALID_ARGUMENT));
EXPECT_THAT(accumulate_stauts.message(),
HasSubstr("Expected exactly one tensor, but got 2"));
}

// Cannot aggregate a tensor with the wrong shape.
TEST(DPQuantileAggregatorTest, AggregateTensorWithWrongShape) {
auto aggregator_status = CreateDPQuantileAggregator(DT_INT64);
TFF_EXPECT_OK(aggregator_status);
auto aggregator = std::move(aggregator_status.value());
Tensor t =
Tensor::Create(DT_INT64, {2}, CreateTestData<int64_t>({1, 1})).value();
auto accumulate_stauts = aggregator->Accumulate(InputTensorList({&t}));
EXPECT_THAT(accumulate_stauts, StatusIs(INVALID_ARGUMENT));
EXPECT_THAT(
accumulate_stauts.message(),
HasSubstr("Expected a scalar tensor, but got a tensor with 2 elements"));
}

// Cannot aggregate a tensor with the wrong dtype.
TEST(DPQuantileAggregatorTest, AggregateTensorWithWrongDtype) {
auto aggregator_status = CreateDPQuantileAggregator(DT_FLOAT);
TFF_EXPECT_OK(aggregator_status);
auto aggregator = std::move(aggregator_status.value());
Tensor t =
Tensor::Create(DT_DOUBLE, {1}, CreateTestData<double>({1.01})).value();
auto accumulate_stauts = aggregator->Accumulate(InputTensorList({&t}));
EXPECT_THAT(accumulate_stauts, StatusIs(INVALID_ARGUMENT));
EXPECT_THAT(accumulate_stauts.message(),
HasSubstr("Expected a DT_FLOAT tensor, but got a DT_DOUBLE"
" tensor"));
}

// Can aggregate scalars. Expect buffer size to be <= kDPQuantileMaxInputs.
TEST(DPQuantileAggregatorTest, AggregateTensorsSuccessful) {
auto aggregator_status = CreateDPQuantileAggregator(DT_DOUBLE);
TFF_EXPECT_OK(aggregator_status);
auto& aggregator =
dynamic_cast<DPQuantileAggregator<double>&>(*aggregator_status.value());

for (int i = 1; i < kDPQuantileMaxInputs + 10; ++i) {
double val = 0.5 + i;
Tensor t =
Tensor::Create(DT_DOUBLE, {}, CreateTestData<double>({val})).value();
auto accumulate_status = aggregator.Accumulate(InputTensorList({&t}));
TFF_EXPECT_OK(accumulate_status);
EXPECT_EQ(aggregator.GetBufferSize(),
i < kDPQuantileMaxInputs ? i : kDPQuantileMaxInputs);
EXPECT_EQ(aggregator.GetNumInputs(), i);
}
}

// The third batch of tests is on merging with another DPQuantileAggregator.
// The fourth batch of tests is on ReportWithEpsilonAndDelta. The DP quantile
// algorithm should produce an output that reasonably approximates the target
// quantile.

// The fifth batch of tests is on serialization & deserialization.

} // namespace
} // namespace aggregation
} // namespace tensorflow_federated

0 comments on commit 956b6c7

Please sign in to comment.