Skip to content

Commit

Permalink
Support flatten with alias (#927)
Browse files Browse the repository at this point in the history
* Support flatten with alias

Signed-off-by: Heng Qian <[email protected]>

* Address comments

Signed-off-by: Heng Qian <[email protected]>

* Address comments

Signed-off-by: Heng Qian <[email protected]>

* Add flatten with alias example

Signed-off-by: Heng Qian <[email protected]>

* Add IT for ambiguous exception

Signed-off-by: Heng Qian <[email protected]>

---------

Signed-off-by: Heng Qian <[email protected]>
  • Loading branch information
qianheng-aws authored Nov 20, 2024
1 parent 9fe754c commit dc690b4
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct<?,?>` or `array<struct<?,?>>`
- `source = table | flatten bridges`
- `source = table | flatten coor`
- `source = table | flatten coor as (altitude, latitude, longitude)`
- `source = table | flatten bridges | flatten coor`
- `source = table | fields bridges | flatten bridges`
- `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country`
Expand Down
19 changes: 17 additions & 2 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type:


### Syntax
`flatten <field>`
`flatten <field> [As aliasSequence]`

* field: to be flattened. The field must be of supported type.
* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field.

### Test table
#### Schema
Expand Down Expand Up @@ -87,4 +88,18 @@ PPL query:
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |

### Example 4: flatten with aliasSequence
This example shows how to flatten with aliasSequence.
PPL query:
- `source=table | flatten coor as (altitude, latitude, longitude)`

| \_time | bridges | city | country | altitude | latitude | longtitude |
|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 |
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 |
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.nio.file.Files
import org.opensearch.flint.spark.FlattenGenerator
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -347,4 +347,85 @@ class FlintSparkPPLFlattenITSuite
val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("flatten struct nested table using alias") {
val frame = sql(s"""
| source = $structNestedTable
| | flatten struct_col
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as subfield_2
| """.stripMargin)

assert(
frame.columns.sameElements(
Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2")))
val results: Array[Row] = frame.collect()
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
val expectedResults: Array[Row] =
Array(
Row(30, 123, "value1", 23, "valueA"),
Row(40, 123, "value5", 33, "valueB"),
Row(30, 823, "value4", 83, "valueC"),
Row(40, 456, "value2", 46, "valueD"),
Row(50, 789, "value3", 89, "valueE")).sorted
// Compare the results
assert(results.sorted.sameElements(expectedResults))

// duplicate alias names
val frame2 = sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as subfield_1
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as subfield_2
| """.stripMargin)

// alias names duplicate with existing fields
assert(
frame2.columns.sameElements(
Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2")))
assert(frame2.collect().sorted.sameElements(expectedResults))

val frame3 = sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| """.stripMargin)

assert(
frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col")))
assert(frame3.collect().sorted.sameElements(expectedResults))

// Throw AnalysisException if The number of aliases supplied in the AS clause does not match the
// number of columns output
val except = intercept[AnalysisException] {
sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| """.stripMargin)
}
assert(except.message.contains(
"The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF"))

// Throw AnalysisException because of ambiguous
val except2 = intercept[AnalysisException] {
sql(s"""
| source = $structNestedTable
| | flatten struct_col as (field1, field2_2)
| | flatten field1 as int_col
| | flatten struct_col2 as (field1, field2_2)
| | flatten field1 as int_col
| | fields field2_2
| """.stripMargin)
}
assert(except2.message.contains(
"[AMBIGUOUS_REFERENCE] Reference `field2_2` is ambiguous, could be: [`field2_2`, `field2_2`]."))
}

}
7 changes: 6 additions & 1 deletion ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ expandCommand
;

flattenCommand
: FLATTEN fieldExpression
: FLATTEN fieldExpression (AS alias = identifierSeq)?
;

trendlineCommand
Expand Down Expand Up @@ -1043,6 +1043,11 @@ qualifiedName
: ident (DOT ident)* # identsAsQualifiedName
;

identifierSeq
: qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq
| LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq
;

tableQualifiedName
: tableIdent (DOT ident)* # identsAsTableQualifiedName
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.opensearch.sql.ast.expression.Field;

import java.util.List;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@RequiredArgsConstructor
public class Flatten extends UnresolvedPlan {
Expand All @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan {

@Getter
private final Field field;
@Getter
private final List<UnresolvedExpression> aliasSequence;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static java.util.List.of;
import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents;
import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty;
Expand Down Expand Up @@ -466,9 +464,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
Expression field = visitExpression(flatten.getField(), context);
List<Expression> alias = flatten.getAliasSequence().stream()
.map(aliasNode -> visitExpression(aliasNode, context))
.collect(Collectors.toList());
context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
FlattenGenerator flattenGenerator = new FlattenGenerator(field);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p));
scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias);
context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p));
return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.AttributeList;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
@Override
public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) {
Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression());
return new Flatten(unresolvedExpression);
List<UnresolvedExpression> alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList();
return new Flatten(unresolvedExpression, alias);
}

/** AD command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident
return visitIdentifiers(ctx.ident());
}

@Override
public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) {
return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList()));
}

@Override
public UnresolvedExpression visitIdentsAsTableQualifiedName(
OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project}
import org.apache.spark.sql.types.IntegerType

class PPLLogicalPlanFlattenCommandTranslatorTestSuite
Expand Down Expand Up @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with one alias") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as col1"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate =
Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test flatten with alias list") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"),
context)

val relation = UnresolvedRelation(Seq("relation"))
val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array"))
val outerGenerator = GeneratorOuter(flattenGenerator)
val generate = Generate(
outerGenerator,
seq(),
true,
None,
Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")),
relation)
val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate)
val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

}

0 comments on commit dc690b4

Please sign in to comment.