forked from apache/datafusion
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathhash_utils.rs
168 lines (141 loc) · 5.69 KB
/
hash_utils.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Functionality used both on logical and physical plans
use crate::error::{DataFusionError, Result};
use std::collections::HashSet;
use crate::logical_plan::JoinType;
use crate::physical_plan::expressions::Column;
use arrow::datatypes::Field;
use arrow::datatypes::Schema;
/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
/// Reference for JoinOn.
pub type JoinOnRef<'a> = &'a [(Column, Column)];
/// Checks whether the schemas "left" and "right" and columns "on" represent a valid join.
/// They are valid whenever their columns' intersection equals the set `on`
pub fn check_join_is_valid(left: &Schema, right: &Schema, on: JoinOnRef) -> Result<()> {
let left: HashSet<Column> = left
.fields()
.iter()
.enumerate()
.map(|(idx, f)| Column::new(f.name(), idx))
.collect();
let right: HashSet<Column> = right
.fields()
.iter()
.enumerate()
.map(|(idx, f)| Column::new(f.name(), idx))
.collect();
check_join_set_is_valid(&left, &right, on)
}
/// Checks whether the sets left, right and on compose a valid join.
/// They are valid whenever their intersection equals the set `on`
fn check_join_set_is_valid(
left: &HashSet<Column>,
right: &HashSet<Column>,
on: &[(Column, Column)],
) -> Result<()> {
let on_left = &on.iter().map(|on| on.0.clone()).collect::<HashSet<_>>();
let left_missing = on_left.difference(left).collect::<HashSet<_>>();
let on_right = &on.iter().map(|on| on.1.clone()).collect::<HashSet<_>>();
let right_missing = on_right.difference(right).collect::<HashSet<_>>();
if !left_missing.is_empty() | !right_missing.is_empty() {
return Err(DataFusionError::Plan(format!(
"The left or right side of the join does not have all columns on \"on\": \nMissing on the left: {:?}\nMissing on the right: {:?}",
left_missing,
right_missing,
)));
};
let remaining = right
.difference(on_right)
.cloned()
.collect::<HashSet<Column>>();
let collisions = left.intersection(&remaining).collect::<HashSet<_>>();
if !collisions.is_empty() {
return Err(DataFusionError::Plan(format!(
"The left schema and the right schema have the following columns with the same name without being on the ON statement: {:?}. Consider aliasing them.",
collisions,
)));
};
Ok(())
}
/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(left: &Schema, right: &Schema, join_type: &JoinType) -> Schema {
let fields: Vec<Field> = match join_type {
JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => {
let left_fields = left.fields().iter();
let right_fields = right.fields().iter();
// left then right
left_fields.chain(right_fields).cloned().collect()
}
JoinType::Semi | JoinType::Anti => left.fields().clone(),
};
Schema::new(fields)
}
#[cfg(test)]
mod tests {
use super::*;
fn check(left: &[Column], right: &[Column], on: &[(Column, Column)]) -> Result<()> {
let left = left
.iter()
.map(|x| x.to_owned())
.collect::<HashSet<Column>>();
let right = right
.iter()
.map(|x| x.to_owned())
.collect::<HashSet<Column>>();
check_join_set_is_valid(&left, &right, on)
}
#[test]
fn check_valid() -> Result<()> {
let left = vec![Column::new("a", 0), Column::new("b1", 1)];
let right = vec![Column::new("a", 0), Column::new("b2", 1)];
let on = &[(Column::new("a", 0), Column::new("a", 0))];
check(&left, &right, on)?;
Ok(())
}
#[test]
fn check_not_in_right() {
let left = vec![Column::new("a", 0), Column::new("b", 1)];
let right = vec![Column::new("b", 0)];
let on = &[(Column::new("a", 0), Column::new("a", 0))];
assert!(check(&left, &right, on).is_err());
}
#[test]
fn check_not_in_left() {
let left = vec![Column::new("b", 0)];
let right = vec![Column::new("a", 0)];
let on = &[(Column::new("a", 0), Column::new("a", 0))];
assert!(check(&left, &right, on).is_err());
}
#[test]
fn check_collision() {
// column "a" would appear both in left and right
let left = vec![Column::new("a", 0), Column::new("c", 1)];
let right = vec![Column::new("a", 0), Column::new("b", 1)];
let on = &[(Column::new("a", 0), Column::new("b", 1))];
assert!(check(&left, &right, on).is_err());
}
#[test]
fn check_in_right() {
let left = vec![Column::new("a", 0), Column::new("c", 1)];
let right = vec![Column::new("b", 0)];
let on = &[(Column::new("a", 0), Column::new("b", 0))];
assert!(check(&left, &right, on).is_ok());
}
}