Skip to content

Commit

Permalink
introduce a generic compute_work_hashes function
Browse files Browse the repository at this point in the history
  • Loading branch information
chadaustin committed Dec 19, 2024
1 parent 3fa566a commit 6f334ba
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 18 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ harness = false

[dependencies]
anyhow = "1.0.65"
arrayvec = "0.7.6"
async-std = { version = "1.12", features = ["unstable"] }
batch-channel = "0.4.1"
blake3 = { version = "1.5.5", features = ["traits-preview"] }
Expand Down Expand Up @@ -76,7 +77,7 @@ sha1 = "0.10.6"
sha2 = "0.10.8"
static_assertions = "1.1.0"
thiserror = "2.0.0"
tokio = { version = "1.38", features = ["macros", "rt-multi-thread", "sync"] }
tokio = { version = "1.38", features = ["fs", "io-util", "macros", "rt-multi-thread", "sync"] }
turbojpeg = { version = "1.1.1", features = ["image"] }
walkdir = "2.5.0"
wild = "2.2.1"
Expand Down
147 changes: 130 additions & 17 deletions src/hash/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use crate::model::ExtraHashes;
use crate::model::FileInfo;
use crate::model::Hash32;
use crate::model::ImageMetadata;
use arrayvec::ArrayVec;
use digest::DynDigest;
use enumset::EnumSetType;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use tokio::io::AsyncReadExt;

mod heic;
mod jpeg;
Expand Down Expand Up @@ -177,31 +179,142 @@ pub fn get_hasher(hash: ContentHashType) -> Box<dyn DynDigest> {
}
}

/*
#[derive(Debug)]
enum WorkerPool {
Tokio,
IO,
}

fn select_worker_pool(hashes: ContentHashSet) -> WorkerPool {
// My NAS, an Atom D2700DC with spinning iron disks, can sustain
// about 100 MB/s of read IO. `photohash bench hashes` gives:
//
// BLAKE3: 421.77 MB/s
// MD5: 243.54 MB/s
// SHA1: 124.63 MB/s
// SHA256: 56.17 MB/s
// -----
// total: 30.96 MB/s
//
// Therefore, computing extra hashes is CPU-bound. If SHA256 is in
// the set, we can simply reuse the Tokio pool.
if hashes.contains(ContentHashType::SHA256) {
WorkerPool::Tokio
} else {
WorkerPool::IO
}
}

#[derive(Debug, Default)]
pub struct ContentHashes {
pub blake3: Option<Hash32>,
pub extra_hashes: ExtraHashes,
}

pub async fn compute_content_hashes(path: PathBuf, which: ContentHashSet) -> anyhow::Result<ContentHashes> {
iopool::run_in_io_pool(move || {
let mut hasher = blake3::Hasher::new();
let mut file = std::fs::File::open(path)?;
let mut buffer = [0u8; READ_SIZE];
loop {
let n = file.read(&mut buffer)?;
if n == 0 {
break;
struct Hasher<'a> {
digest: &'a mut (dyn DynDigest + Send),
target: &'a mut [u8],
}

impl Hasher<'_> {
fn update(&mut self, data: &[u8]) {
self.digest.update(data)
}
fn finalize(&mut self) {
self.digest
.finalize_into_reset(self.target)
.expect("incorrect hash length");
}
}

pub async fn compute_content_hashes(
path: PathBuf,
which: ContentHashSet,
) -> anyhow::Result<ContentHashes> {
let mut result = ContentHashes::default();

let mut blake3_hasher: Option<blake3::Hasher> = None;
let mut md5_hasher: Option<md5::Md5> = None;
let mut sha1_hasher: Option<sha1::Sha1> = None;
let mut sha256_hasher: Option<sha2::Sha256> = None;

// 4 = blake3 + md5 + sha1 + sha256
const HASHER_COUNT: usize = ContentHashSet::variant_count() as usize;
let mut hashers = ArrayVec::<_, HASHER_COUNT>::new();

if which.contains(ContentHashType::BLAKE3) {
hashers.push(Hasher {
digest: blake3_hasher.get_or_insert_default(),
target: result.blake3.get_or_insert_default(),
});
}

if which.contains(ContentHashType::MD5) {
hashers.push(Hasher {
digest: md5_hasher.get_or_insert_default(),
target: result.extra_hashes.md5.get_or_insert_default(),
});
}

if which.contains(ContentHashType::SHA1) {
hashers.push(Hasher {
digest: sha1_hasher.get_or_insert_default(),
target: result.extra_hashes.sha1.get_or_insert_default(),
});
}

if which.contains(ContentHashType::SHA256) {
hashers.push(Hasher {
digest: sha256_hasher.get_or_insert_default(),
target: result.extra_hashes.sha256.get_or_insert_default(),
});
}

match match select_worker_pool(which) {
WorkerPool::Tokio => {
let mut file = tokio::fs::File::open(path).await?;
// TODO: 64KiB * (100 MB/s) = 655 microseconds, perhaps
// below scheduling quanta. Consider a larger value.
let mut buffer = [0u8; READ_SIZE];
loop {
let read = file.read(&mut buffer);
let n = read.await?;
if n == 0 {
break;
}
for h in &mut hashers {
h.update(&buffer[..n]);
}
}
hasher.update(&buffer[..n]);
for mut h in hashers {
h.finalize();
}
Ok(())
}
Ok(hasher.finalize().into())
})
.await
WorkerPool::IO => {
iopool::run_in_io_pool_local(|| {
let mut file = std::fs::File::open(path)?;
let mut buffer = [0u8; READ_SIZE];
loop {
let n = file.read(&mut buffer)?;
if n == 0 {
break;
}
for h in &mut hashers {
h.update(&buffer[..n]);
}
}
for mut h in hashers {
h.finalize();
}
Ok(())
})
.await
}
} {
Ok(()) => Ok(result),
Err(e) => Err(e),
}
}
*/

pub async fn compute_blake3(path: PathBuf) -> anyhow::Result<Hash32> {
// This assumes that computing blake3 is much faster than IO and
Expand Down
9 changes: 9 additions & 0 deletions src/iopool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io;
use std::path::PathBuf;
use std::sync::OnceLock;

// https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
static IO_POOL: OnceLock<rayon::ThreadPool> = OnceLock::new();
const IO_POOL_CONCURRENCY: usize = 4;

Expand Down Expand Up @@ -30,6 +31,14 @@ where
rx.await.unwrap()
}

pub async fn run_in_io_pool_local<F, T>(f: F) -> T
where
F: FnOnce() -> T + Send,
T: Send,
{
tokio::task::block_in_place(move || get_io_pool().install(f))
}

pub async fn get_file_contents(path: PathBuf) -> io::Result<Vec<u8>> {
run_in_io_pool(move || std::fs::read(path)).await
}

0 comments on commit 6f334ba

Please sign in to comment.