Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Rework array data for mutability #3382

Draft
wants to merge 60 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
d875f5e
add buffer stubs
scsmithr Dec 27, 2024
e7521c5
more stub
scsmithr Dec 27, 2024
3571ce1
addressable
scsmithr Dec 27, 2024
24e0cfe
string view
scsmithr Dec 27, 2024
69405a9
temp rename
scsmithr Dec 27, 2024
516da58
fixup! temp rename
scsmithr Dec 27, 2024
583581a
array
scsmithr Dec 28, 2024
f4ddf6d
copy rows
scsmithr Dec 28, 2024
762cbc7
start executors
scsmithr Dec 28, 2024
39c6d16
binary
scsmithr Dec 28, 2024
459acf7
temp rename
scsmithr Dec 28, 2024
f6821da
agg
scsmithr Dec 28, 2024
e7d8cff
temp renames
scsmithr Dec 28, 2024
278b43a
ok
scsmithr Dec 28, 2024
06ce68d
ok
scsmithr Dec 28, 2024
e96cca6
transition some numeric funcs
scsmithr Dec 29, 2024
b098ce1
temp rename
scsmithr Dec 29, 2024
f0aa35e
switch add
scsmithr Dec 29, 2024
42f56b4
more arith
scsmithr Dec 29, 2024
8543379
date extract
scsmithr Dec 29, 2024
94d5606
date trunc
scsmithr Dec 29, 2024
11520ae
epoch
scsmithr Dec 29, 2024
1369e38
ternary
scsmithr Dec 29, 2024
c6a4110
uniform
scsmithr Dec 29, 2024
9178c78
some string funcs
scsmithr Dec 30, 2024
3fe5e65
more string
scsmithr Dec 30, 2024
bfe09e9
negate
scsmithr Dec 30, 2024
c5e64ae
and/or
scsmithr Dec 30, 2024
3a0de15
reduce
scsmithr Dec 30, 2024
04d6d38
list values
scsmithr Dec 31, 2024
3b9ea90
numeric cleanup
scsmithr Dec 31, 2024
85cd55d
list extract
scsmithr Dec 31, 2024
d57cf93
tests
scsmithr Dec 31, 2024
ba4091c
l2
scsmithr Dec 31, 2024
f71f7ac
most comparisons
scsmithr Dec 31, 2024
90470e2
some manage
scsmithr Dec 31, 2024
91acb77
reset for write
scsmithr Dec 31, 2024
e95d8c0
eval column expr
scsmithr Dec 31, 2024
7768bb7
lit
scsmithr Jan 1, 2025
5775735
casting
scsmithr Jan 2, 2025
14c93fb
more expr stuff
scsmithr Jan 2, 2025
bcf868c
case
scsmithr Jan 3, 2025
8372762
expr create state
scsmithr Jan 3, 2025
0c5df10
eval constant
scsmithr Jan 3, 2025
a8d7b4c
Merge remote-tracking branch 'origin/main' into sean/buf2
scsmithr Jan 4, 2025
5390ed0
fixup! Merge remote-tracking branch 'origin/main' into sean/buf2
scsmithr Jan 4, 2025
cd088f9
some aggregate rework
scsmithr Jan 4, 2025
554dc0b
more agg
scsmithr Jan 4, 2025
f5d4920
wip
scsmithr Jan 5, 2025
edb8875
spooky
scsmithr Jan 5, 2025
35de02d
more
scsmithr Jan 5, 2025
8573d07
update sum
scsmithr Jan 5, 2025
60a1af1
some unimplementeds
scsmithr Jan 5, 2025
b9aa85d
rename
scsmithr Jan 5, 2025
4b745fd
some cleanup
scsmithr Jan 5, 2025
bf2acd6
temp rename
scsmithr Jan 5, 2025
eeff12f
project
scsmithr Jan 5, 2025
95feac4
filter
scsmithr Jan 6, 2025
5f878fa
block
scsmithr Jan 6, 2025
f34f71a
stub
scsmithr Jan 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions crates/docgen/src/markdown_table.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

use rayexec_error::Result;
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::field::Schema;
use rayexec_execution::arrays::format::{FormatOptions, Formatter};

Expand All @@ -13,7 +13,7 @@ const FORMATTER: Formatter = Formatter::new(FormatOptions {
pub fn write_markdown_table<'a>(
output: &mut dyn fmt::Write,
schema: &Schema,
batches: impl IntoIterator<Item = &'a Batch>,
batches: impl IntoIterator<Item = &'a Batch2>,
) -> Result<()> {
// 'field1 | field2 | field3'
let header = schema
Expand Down Expand Up @@ -54,17 +54,17 @@ pub fn write_markdown_table<'a>(

#[cfg(test)]
mod tests {
use rayexec_execution::arrays::array::Array;
use rayexec_execution::arrays::array::Array2;
use rayexec_execution::arrays::datatype::DataType;
use rayexec_execution::arrays::field::Field;

use super::*;

#[test]
fn simple() {
let batch = Batch::try_new([
Array::from_iter([1, 2, 3]),
Array::from_iter(["cat", "dog", "mouse"]),
let batch = Batch2::try_new([
Array2::from_iter([1, 2, 3]),
Array2::from_iter(["cat", "dog", "mouse"]),
])
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/rayexec_csv/src/copy_to.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::future::BoxFuture;
use futures::FutureExt;
use rayexec_error::Result;
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::field::Schema;
use rayexec_execution::execution::operators::sink::PartitionSink;
use rayexec_execution::functions::copy::CopyToFunction;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct CsvCopyToSink {
}

impl CsvCopyToSink {
async fn push_inner(&mut self, batch: Batch) -> Result<()> {
async fn push_inner(&mut self, batch: Batch2) -> Result<()> {
let mut buf = Vec::with_capacity(1024);
self.encoder.encode(&batch, &mut buf)?;
self.sink.write_all(buf.into()).await?;
Expand All @@ -68,7 +68,7 @@ impl CsvCopyToSink {
}

impl PartitionSink for CsvCopyToSink {
fn push(&mut self, batch: Batch) -> BoxFuture<'_, Result<()>> {
fn push(&mut self, batch: Batch2) -> BoxFuture<'_, Result<()>> {
self.push_inner(batch).boxed()
}

Expand Down
4 changes: 2 additions & 2 deletions crates/rayexec_csv/src/datatable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::{self, Debug};

use futures::future::BoxFuture;
use rayexec_error::Result;
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::runtime::Runtime;
use rayexec_execution::storage::table_storage::{
DataTable,
Expand Down Expand Up @@ -60,7 +60,7 @@ pub struct CsvFileScan {
}

impl DataTableScan for CsvFileScan {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch>>> {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch2>>> {
Box::pin(async { self.reader.read_next().await })
}
}
Expand Down
26 changes: 13 additions & 13 deletions crates/rayexec_csv/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use futures::StreamExt;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::arrays::array::{Array, ArrayData};
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::array::{Array2, ArrayData2};
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::bitmap::Bitmap;
use rayexec_execution::arrays::compute::cast::parse::{
BoolParser,
Expand Down Expand Up @@ -342,7 +342,7 @@ impl AsyncCsvReader {
AsyncCsvReader { stream }
}

pub async fn read_next(&mut self) -> Result<Option<Batch>> {
pub async fn read_next(&mut self) -> Result<Option<Batch2>> {
self.stream.next_batch().await
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ struct AsyncCsvStream {
}

impl AsyncCsvStream {
async fn next_batch(&mut self) -> Result<Option<Batch>> {
async fn next_batch(&mut self) -> Result<Option<Batch2>> {
loop {
let (buf, offset) = match self.buf.take() {
Some(buf) => (buf, self.buf_offset),
Expand Down Expand Up @@ -455,7 +455,7 @@ impl AsyncCsvStream {
completed: CompletedRecords,
schema: &Schema,
skip_header: bool,
) -> Result<Batch> {
) -> Result<Batch2> {
let skip_records = if skip_header { 1 } else { 0 };

let mut arrs = Vec::with_capacity(schema.fields.len());
Expand Down Expand Up @@ -483,14 +483,14 @@ impl AsyncCsvStream {
arrs.push(arr);
}

Batch::try_new(arrs)
Batch2::try_new(arrs)
}

fn build_boolean(
completed: &CompletedRecords,
field_idx: usize,
skip_records: usize,
) -> Result<Array> {
) -> Result<Array2> {
let mut values = Bitmap::with_capacity(completed.num_completed());
let mut validity = Bitmap::with_capacity(completed.num_completed());

Expand All @@ -507,7 +507,7 @@ impl AsyncCsvStream {
}
}

Ok(Array::new_with_validity_and_array_data(
Ok(Array2::new_with_validity_and_array_data(
DataType::Boolean,
validity,
BooleanStorage::from(values),
Expand All @@ -520,11 +520,11 @@ impl AsyncCsvStream {
field_idx: usize,
skip_records: usize,
mut parser: P,
) -> Result<Array>
) -> Result<Array2>
where
T: Default,
P: Parser<Type = T>,
PrimitiveStorage<T>: Into<ArrayData>,
PrimitiveStorage<T>: Into<ArrayData2>,
{
let mut values = Vec::with_capacity(completed.num_completed());
let mut validity = Bitmap::with_capacity(completed.num_completed());
Expand All @@ -544,7 +544,7 @@ impl AsyncCsvStream {
}
}

Ok(Array::new_with_validity_and_array_data(
Ok(Array2::new_with_validity_and_array_data(
datatype.clone(),
validity,
PrimitiveStorage::from(values),
Expand All @@ -555,7 +555,7 @@ impl AsyncCsvStream {
completed: &CompletedRecords,
field_idx: usize,
skip_records: usize,
) -> Result<Array> {
) -> Result<Array2> {
let mut values = GermanVarlenBuffer::with_len(completed.num_completed() - skip_records);
let mut validity = Bitmap::with_capacity(completed.num_completed());

Expand All @@ -569,7 +569,7 @@ impl AsyncCsvStream {
}
}

Ok(Array::new_with_validity_and_array_data(
Ok(Array2::new_with_validity_and_array_data(
DataType::Utf8,
validity,
values.into_data(),
Expand Down
4 changes: 2 additions & 2 deletions crates/rayexec_csv/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::Write as _;

use csv::ByteRecord;
use rayexec_error::{Result, ResultExt};
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::field::Schema;
use rayexec_execution::arrays::format::{FormatOptions, Formatter};

Expand Down Expand Up @@ -38,7 +38,7 @@ impl CsvEncoder {
}
}

pub fn encode(&mut self, batch: &Batch, output_buf: &mut Vec<u8>) -> Result<()> {
pub fn encode(&mut self, batch: &Batch2, output_buf: &mut Vec<u8>) -> Result<()> {
const FORMATTER: Formatter = Formatter::new(FormatOptions::new());

let mut csv_writer = csv::WriterBuilder::new()
Expand Down
4 changes: 2 additions & 2 deletions crates/rayexec_debug/src/discard.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::future::BoxFuture;
use rayexec_error::Result;
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::field::Schema;
use rayexec_execution::execution::operators::sink::PartitionSink;
use rayexec_execution::functions::copy::CopyToFunction;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl CopyToFunction for DiscardCopyToFunction {
struct DiscardCopyToSink;

impl PartitionSink for DiscardCopyToSink {
fn push(&mut self, _batch: Batch) -> BoxFuture<'_, Result<()>> {
fn push(&mut self, _batch: Batch2) -> BoxFuture<'_, Result<()>> {
Box::pin(async { Ok(()) })
}

Expand Down
16 changes: 8 additions & 8 deletions crates/rayexec_debug/src/table_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use futures::future::BoxFuture;
use parking_lot::Mutex;
use rayexec_error::{RayexecError, Result};
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::field::Field;
use rayexec_execution::database::catalog_entry::CatalogEntry;
use rayexec_execution::execution::operators::sink::PartitionSink;
Expand All @@ -28,7 +28,7 @@ pub struct TablePreload {
pub schema: String,
pub name: String,
pub columns: Vec<Field>,
pub data: Batch,
pub data: Batch2,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -124,7 +124,7 @@ impl TableStorage for DebugTableStorage {

#[derive(Debug, Clone, Default)]
pub struct DebugDataTable {
data: Arc<Mutex<Vec<Batch>>>,
data: Arc<Mutex<Vec<Batch2>>>,
}

impl DataTable for DebugDataTable {
Expand Down Expand Up @@ -168,23 +168,23 @@ impl DataTable for DebugDataTable {

#[derive(Debug)]
pub struct DebugDataTableScan {
data: Vec<Batch>,
data: Vec<Batch2>,
}

impl DataTableScan for DebugDataTableScan {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch>>> {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch2>>> {
Box::pin(async { Ok(self.data.pop()) })
}
}

#[derive(Debug)]
pub struct DebugDataTableInsert {
collected: Vec<Batch>,
data: Arc<Mutex<Vec<Batch>>>,
collected: Vec<Batch2>,
data: Arc<Mutex<Vec<Batch2>>>,
}

impl PartitionSink for DebugDataTableInsert {
fn push(&mut self, batch: Batch) -> BoxFuture<'_, Result<()>> {
fn push(&mut self, batch: Batch2) -> BoxFuture<'_, Result<()>> {
Box::pin(async {
self.collected.push(batch);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/rayexec_delta/src/datatable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use futures::future::BoxFuture;
use rayexec_error::Result;
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::storage::table_storage::{DataTable, DataTableScan, Projections};

use crate::protocol::table::{Table, TableScan};
Expand Down Expand Up @@ -34,7 +34,7 @@ struct DeltaTableScan {
}

impl DataTableScan for DeltaTableScan {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch>>> {
fn pull(&mut self) -> BoxFuture<'_, Result<Option<Batch2>>> {
Box::pin(async { self.scan.read_next().await })
}
}
4 changes: 2 additions & 2 deletions crates/rayexec_delta/src/protocol/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use futures::{StreamExt, TryStreamExt};
use rayexec_error::{not_implemented, RayexecError, Result, ResultExt};
use rayexec_execution::arrays::batch::Batch;
use rayexec_execution::arrays::batch::Batch2;
use rayexec_execution::arrays::datatype::{DataType, DecimalTypeMeta, TimeUnit, TimestampTypeMeta};
use rayexec_execution::arrays::field::{Field, Schema};
use rayexec_execution::arrays::scalar::decimal::{Decimal128Type, DecimalType};
Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct TableScan {

impl TableScan {
/// Read the next batch.
pub async fn read_next(&mut self) -> Result<Option<Batch>> {
pub async fn read_next(&mut self) -> Result<Option<Batch2>> {
loop {
if self.current.is_none() {
let path = match self.paths.pop_front() {
Expand Down
10 changes: 8 additions & 2 deletions crates/rayexec_error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ macro_rules! not_implemented {
}

// TODO: Implement partial eq on msg
#[derive(Debug)]
pub struct RayexecError {
inner: Box<RayexecErrorInner>,
}

#[derive(Debug)]
struct RayexecErrorInner {
/// Message for the error.
pub msg: String,
Expand Down Expand Up @@ -153,6 +151,14 @@ impl fmt::Display for RayexecError {
}
}

impl fmt::Debug for RayexecError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Just use the Display impl for Debug, significantly easier to read
// especially when the error contains a backtrace.
write!(f, "{self}")
}
}

impl Error for RayexecError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.inner.source.as_ref().map(|e| e.as_ref() as _)
Expand Down
1 change: 1 addition & 0 deletions crates/rayexec_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rayexec_parser = { path = "../rayexec_parser" }
# rayexec_bullet = { path = "../rayexec_bullet" }
rayexec_io = { path = "../rayexec_io" }
fmtutil = { path = "../fmtutil" }
stdutil = { path = "../stdutil" }
# stackutil = { path = "../stackutil" } TODO: psm hash issues when compiling to wasm on macos

ahash = { workspace = true }
Expand Down
Loading
Loading