Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace drain() and drain_filter() with extract_if() and extract_from_if() #766

Merged
merged 5 commits into from
Mar 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
# redb - Changelog

## 2.0.0 - 2024-XX-XX

### Major file format change
2.0.0 uses a new file format that optimizes `len()` to be constant time. This means that it is not
backwards compatible with 1.x. To upgrade, consider using a pattern like that in XXX test.
backwards compatible with 1.x. To upgrade, consider using a pattern like that in
[upgrade_v1_to_v2](https://github.com/cberner/redb/blob/222a37f4600588261b0983eebcd074bb69d6e5a0/tests/backward_compatibility.rs#L282-L299) test.

### Other changes
* `check_integrity()` now returns a `DatabaseError` instead of `StorageError`
* Refactor table metadata methods into a new `ReadableTableMetadata` trait
* Rename `RedbKey` to `Key`
* Rename `RedbValue` to `Value`
* Remove lifetimes from read-only tables
* Remove lifetime from `WriteTransaction` and `ReadTransaction`
* Remove `drain()` and `drain_filter()` from `Table`
* impl `Clone` for `Range`
* Add `len()` and `is_empty()` to `MultimapValue`
* Add `retain()` and `retain_in()` to `Table`
* Add `extract_if()` and `extract_from_if()` to `Table`
* Add `range()` returning a `Range` with the `'static` lifetime to read-only tables
* Add `get()` returning a range with the `'static` lifetime to read-only multimap tables
* Add `close()` method to `ReadTransaction`
Expand Down
19 changes: 10 additions & 9 deletions fuzz/fuzz_targets/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,25 @@ pub(crate) enum FuzzOperation {
},
PopLast {
},
Drain {
start_key: BoundedU64<KEY_SPACE>,
len: BoundedU64<KEY_SPACE>,
reversed: bool,
Retain {
modulus: U64Between<1, 8>,
},
DrainFilter {
RetainIn {
start_key: BoundedU64<KEY_SPACE>,
len: BoundedU64<KEY_SPACE>,
modulus: U64Between<1, 8>,
reversed: bool,
},
Retain {
ExtractIf {
modulus: U64Between<1, 8>,
take: BoundedUSize<10>,
reversed: bool,
},
RetainIn {
ExtractFromIf {
start_key: BoundedU64<KEY_SPACE>,
len: BoundedU64<KEY_SPACE>,
range_len: BoundedU64<KEY_SPACE>,
take: BoundedUSize<10>,
modulus: U64Between<1, 8>,
reversed: bool,
},
Range {
start_key: BoundedU64<KEY_SPACE>,
Expand Down
60 changes: 36 additions & 24 deletions fuzz/fuzz_targets/fuzz_redb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ fn handle_multimap_table_op(op: &FuzzOperation, reference: &mut BTreeMap<u64, BT
FuzzOperation::PopLast { .. } => {
// no-op. Multimap tables don't support this
}
FuzzOperation::Drain { .. } => {
FuzzOperation::ExtractIf { .. } => {
// no-op. Multimap tables don't support this
}
FuzzOperation::DrainFilter { .. } => {
FuzzOperation::ExtractFromIf { .. } => {
// no-op. Multimap tables don't support this
}
FuzzOperation::Retain { .. } => {
Expand Down Expand Up @@ -378,67 +378,79 @@ fn handle_table_op(op: &FuzzOperation, reference: &mut BTreeMap<u64, usize>, tab
assert!(table.pop_first()?.is_none());
}
}
FuzzOperation::Drain { start_key, len, reversed } => {
let start = start_key.value;
let end = start + len.value;
FuzzOperation::ExtractIf { take, modulus, reversed } => {
let modulus = modulus.value;
let mut reference_iter: Box<dyn Iterator<Item = (&u64, &usize)>> =
if *reversed {
Box::new(reference.range(start..end).rev())
Box::new(reference.iter().rev().take(take.value))
} else {
Box::new(reference.range(start..end))
Box::new(reference.iter().take(take.value))
};
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, AccessGuard<&[u8]>), redb::StorageError>>> = if *reversed {
Box::new(table.drain(start..end)?.rev())
Box::new(table.extract_if(|x, _| x % modulus == 0)?.rev())
} else {
Box::new(table.drain(start..end)?)
Box::new(table.extract_if(|x, _| x % modulus == 0)?)
};
let mut remaining = take.value;
let mut remove_from_reference = vec![];
while let Some((ref_key, ref_value_len)) = reference_iter.next() {
if *ref_key % modulus != 0 {
continue;
}
if remaining == 0 {
break;
}
remaining -= 1;
let (key, value) = iter.next().unwrap()?;
remove_from_reference.push(*ref_key);
assert_eq!(*ref_key, key.value());
assert_eq!(*ref_value_len, value.value().len());
}
drop(reference_iter);
reference.retain(|x, _| *x < start || *x >= end);
// This is basically assert!(iter.next().is_none()), but we also allow an Err such as a simulated IO error
if let Some(Ok((_, _))) = iter.next() {
panic!();
for x in remove_from_reference {
reference.remove(&x);
}
}
FuzzOperation::DrainFilter { start_key, len, modulus, reversed } => {
FuzzOperation::ExtractFromIf { start_key, range_len, take, modulus, reversed } => {
let start = start_key.value;
let end = start + len.value;
let end = start + range_len.value;
let modulus = modulus.value;
let mut reference_iter: Box<dyn Iterator<Item = (&u64, &usize)>> =
if *reversed {
Box::new(reference.range(start..end).rev())
Box::new(reference.range(start..end).rev().take(take.value))
} else {
Box::new(reference.range(start..end))
Box::new(reference.range(start..end).take(take.value))
};
let mut iter: Box<dyn Iterator<Item = Result<(AccessGuard<u64>, AccessGuard<&[u8]>), redb::StorageError>>> = if *reversed {
Box::new(table.drain_filter(start..end, |x, _| x % modulus == 0)?.rev())
Box::new(table.extract_from_if(start..end, |x, _| x % modulus == 0)?.rev())
} else {
Box::new(table.drain_filter(start..end, |x, _| x % modulus == 0)?)
Box::new(table.extract_from_if(start..end, |x, _| x % modulus == 0)?)
};
let mut remaining = take.value;
let mut remove_from_reference = vec![];
while let Some((ref_key, ref_value_len)) = reference_iter.next() {
if *ref_key % modulus != 0 {
continue;
}
if remaining == 0 {
break;
}
remaining -= 1;
let (key, value) = iter.next().unwrap()?;
remove_from_reference.push(*ref_key);
assert_eq!(*ref_key, key.value());
assert_eq!(*ref_value_len, value.value().len());
}
drop(reference_iter);
reference.retain(|x, _| (*x < start || *x >= end) || *x % modulus != 0);
// This is basically assert!(iter.next().is_none()), but we also allow an Err such as a simulated IO error
if let Some(Ok((_, _))) = iter.next() {
panic!();
for x in remove_from_reference {
reference.remove(&x);
}
}
FuzzOperation::RetainIn { start_key, len, modulus } => {
let start = start_key.value;
let end = start + len.value;
let modulus = modulus.value;
table.retain_in(|x, _| x % modulus == 0, start..end)?;
table.retain_in(start..end, |x, _| x % modulus == 0)?;
reference.retain(|x, _| (*x < start || *x >= end) || *x % modulus == 0);
}
FuzzOperation::Retain { modulus } => {
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ pub use multimap_table::{
ReadOnlyUntypedMultimapTable, ReadableMultimapTable,
};
pub use table::{
Drain, DrainFilter, Range, ReadOnlyTable, ReadOnlyUntypedTable, ReadableTable,
ReadableTableMetadata, Table, TableStats,
ExtractIf, Range, ReadOnlyTable, ReadOnlyUntypedTable, ReadableTable, ReadableTableMetadata,
Table, TableStats,
};
pub use transactions::{DatabaseStats, Durability, ReadTransaction, WriteTransaction};
pub use tree_store::{AccessGuard, AccessGuardMut, Savepoint};
Expand Down
109 changes: 35 additions & 74 deletions src/table.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::db::TransactionGuard;
use crate::sealed::Sealed;
use crate::tree_store::{
AccessGuardMut, Btree, BtreeDrain, BtreeDrainFilter, BtreeHeader, BtreeMut, BtreeRangeIter,
PageHint, PageNumber, RawBtree, TransactionalMemory, MAX_VALUE_LENGTH,
AccessGuardMut, Btree, BtreeExtractIf, BtreeHeader, BtreeMut, BtreeRangeIter, PageHint,
PageNumber, RawBtree, TransactionalMemory, MAX_VALUE_LENGTH,
};
use crate::types::{Key, MutInPlaceValue, Value};
use crate::{AccessGuard, StorageError, WriteTransaction};
Expand Down Expand Up @@ -131,39 +131,44 @@ impl<'txn, K: Key + 'static, V: Value + 'static> Table<'txn, K, V> {
}
}

/// Removes the specified range and returns the removed entries in an iterator
/// Applies `predicate` to all key-value pairs. All entries for which
/// `predicate` evaluates to `true` are returned in an iterator, and those which are read from the iterator are removed
///
/// The iterator will consume all items in the range on drop.
pub fn drain<'a, KR>(&mut self, range: impl RangeBounds<KR> + 'a) -> Result<Drain<K, V>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
{
self.tree.drain(&range).map(Drain::new)
/// Note: values not read from the iterator will not be removed
pub fn extract_if<F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
predicate: F,
) -> Result<ExtractIf<K, V, F>> {
self.extract_from_if::<K::SelfType<'_>, F>(.., predicate)
}

/// Applies `predicate` to all key-value pairs in the specified range. All entries for which
/// `predicate` evaluates to `true` are removed and returned in an iterator
/// `predicate` evaluates to `true` are returned in an iterator, and those which are read from the iterator are removed
///
/// The iterator will consume all items in the range matching the predicate on drop.
pub fn drain_filter<'a, KR, F: for<'f> Fn(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
range: impl RangeBounds<KR> + 'a,
/// Note: values not read from the iterator will not be removed
pub fn extract_from_if<
'a,
'a0,
KR,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
>(
&'a mut self,
range: impl RangeBounds<KR> + 'a0,
predicate: F,
) -> Result<DrainFilter<K, V, F>>
) -> Result<ExtractIf<'a, K, V, F>>
where
K: 'a,
KR: Borrow<K::SelfType<'a>> + 'a,
K: 'a0,
KR: Borrow<K::SelfType<'a0>> + 'a0,
{
self.tree
.drain_filter(&range, predicate)
.map(DrainFilter::new)
.extract_from_if(&range, predicate)
.map(ExtractIf::new)
}

/// Applies `predicate` to all key-value pairs. All entries for which
/// `predicate` evaluates to `false` are removed.
///
pub fn retain<F: for<'f> Fn(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
pub fn retain<F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
predicate: F,
) -> Result {
Expand All @@ -173,10 +178,10 @@ impl<'txn, K: Key + 'static, V: Value + 'static> Table<'txn, K, V> {
/// Applies `predicate` to all key-value pairs in the range `start..end`. All entries for which
/// `predicate` evaluates to `false` are removed.
///
pub fn retain_in<'a, KR, F: for<'f> Fn(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
pub fn retain_in<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
&mut self,
predicate: F,
range: impl RangeBounds<KR> + 'a,
predicate: F,
) -> Result
where
KR: Borrow<K::SelfType<'a>> + 'a,
Expand Down Expand Up @@ -530,68 +535,24 @@ impl<K: Key + 'static, V: Value + 'static> Debug for ReadOnlyTable<K, V> {
}
}

pub struct Drain<'a, K: Key + 'static, V: Value + 'static> {
inner: BtreeDrain<K, V>,
_lifetime: PhantomData<&'a ()>,
}

impl<'a, K: Key + 'static, V: Value + 'static> Drain<'a, K, V> {
fn new(inner: BtreeDrain<K, V>) -> Self {
Self {
inner,
_lifetime: Default::default(),
}
}
}

impl<'a, K: Key + 'static, V: Value + 'static> Iterator for Drain<'a, K, V> {
type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;

fn next(&mut self) -> Option<Self::Item> {
let entry = self.inner.next()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}

impl<'a, K: Key + 'static, V: Value + 'static> DoubleEndedIterator for Drain<'a, K, V> {
fn next_back(&mut self) -> Option<Self::Item> {
let entry = self.inner.next_back()?;
Some(entry.map(|entry| {
let (page, key_range, value_range) = entry.into_raw();
let key = AccessGuard::with_page(page.clone(), key_range);
let value = AccessGuard::with_page(page, value_range);
(key, value)
}))
}
}

pub struct DrainFilter<
pub struct ExtractIf<
'a,
K: Key + 'static,
V: Value + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> {
inner: BtreeDrainFilter<K, V, F>,
_lifetime: PhantomData<&'a ()>,
inner: BtreeExtractIf<'a, K, V, F>,
}

impl<
'a,
K: Key + 'static,
V: Value + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> DrainFilter<'a, K, V, F>
> ExtractIf<'a, K, V, F>
{
fn new(inner: BtreeDrainFilter<K, V, F>) -> Self {
Self {
inner,
_lifetime: Default::default(),
}
fn new(inner: BtreeExtractIf<'a, K, V, F>) -> Self {
Self { inner }
}
}

Expand All @@ -600,7 +561,7 @@ impl<
K: Key + 'static,
V: Value + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> Iterator for DrainFilter<'a, K, V, F>
> Iterator for ExtractIf<'a, K, V, F>
{
type Item = Result<(AccessGuard<'a, K>, AccessGuard<'a, V>)>;

Expand All @@ -620,7 +581,7 @@ impl<
K: Key + 'static,
V: Value + 'static,
F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool,
> DoubleEndedIterator for DrainFilter<'a, K, V, F>
> DoubleEndedIterator for ExtractIf<'a, K, V, F>
{
fn next_back(&mut self) -> Option<Self::Item> {
let entry = self.inner.next_back()?;
Expand Down
Loading
Loading