Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1853] Error management for Wrangler Transform and Storage plugin #725

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.cdap.wrangler.api;

import java.util.List;

/**
* Base class for error record that includes the critical fields.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -66,19 +69,24 @@ public UsageDefinition define() {
}

@Override
public void initialize(Arguments args) throws DirectiveParseException {
public void initialize(Arguments args) {
this.variable = ((Identifier) args.value("variable")).value();
this.incrementBy = ((Numeric) args.value("value")).value().longValue();
String expression = ((Expression) args.value("condition")).value();
try {
el = EL.compile(expression);
} catch (ELException e) {
throw new DirectiveParseException(NAME, e.getMessage(), e);
String errorMessage = String.format(
"Unable to compile, invalid expression provided '%s'. %s: %s", expression,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveParseException(NAME, errorMessage, e));
}
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
// Move the fields from the row into the context.
ELContext ctx = new ELContext();
Expand All @@ -102,7 +110,12 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
context.getTransientStore().increment(TransientVariableScope.GLOBAL, variable, incrementBy);
}
} catch (ELException e) {
throw new DirectiveExecutionException(NAME, e.getMessage(), e);
String errorMessage = String.format(
"Unable to execute, invalid expression provided '%s'. %s: %s", el.getScriptParsedText(),
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(NAME, errorMessage, e));
}
}
return rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -66,13 +69,18 @@ public UsageDefinition define() {
}

@Override
public void initialize(Arguments args) throws DirectiveParseException {
public void initialize(Arguments args) {
this.variable = ((Identifier) args.value("variable")).value();
String expression = ((Expression) args.value("condition")).value();
try {
el = EL.compile(expression);
} catch (ELException e) {
throw new DirectiveParseException(NAME, e.getMessage(), e);
String errorMessage = String.format(
"Unable to compile, Error while parsing expression '%s'. %s: %s", expression,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveParseException(NAME, errorMessage, e));
}
}

Expand All @@ -82,7 +90,7 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
// Move the fields from the row into the context.
ELContext ctx = new ELContext(context, el, row);
Expand All @@ -95,7 +103,12 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
context.getTransientStore().set(TransientVariableScope.GLOBAL, variable, result.getObject());
}
} catch (ELException e) {
throw new DirectiveExecutionException(NAME, e.getMessage(), e);
String errorMessage = String.format(
"Unable to execute, error while executing expression '%s'. %s: %s",
el.getScriptParsedText(), e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(NAME, errorMessage, e));
}
}
return rows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -71,14 +74,19 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
for (int i = 0; i < row.width(); ++i) {
String name = row.getColumn(i);
try {
row.setColumn(i, getSedReplacedColumnName(name));
} catch (IllegalArgumentException e) {
throw new DirectiveExecutionException(NAME, e.getMessage(), e);
String errorMessage = String.format(
"Failed to apply sed expression '%s' on column '%s'. %s: %s", sed, name,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.SYSTEM, false, new DirectiveExecutionException(NAME, errorMessage, e));
}
}
}
Expand Down
19 changes: 14 additions & 5 deletions wrangler-core/src/main/java/io/cdap/directives/column/Copy.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -75,11 +78,14 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
int sidx = row.find(source.value());
if (sidx == -1) {
throw new DirectiveExecutionException(NAME, String.format("Column '%s' does not exist.", source.value()));
String errorMessage = String.format("Column '%s' does not exist.", source.value());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(NAME, errorMessage));
}

int didx = row.find(destination.value());
Expand All @@ -95,9 +101,12 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
// if destination column exists, and force is set to false, then throw exception, else
// overwrite it.
if (!force) {
throw new DirectiveExecutionException(
NAME, String.format("Destination column '%s' already exists in the row. Use 'force' " +
"option to overwrite the column.", destination.value()));
String errorMessage = String.format(
"Destination column '%s' already exists in the row. Use 'force' "
+ "option to overwrite the column.", destination.value());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(errorMessage));
}
row.setValue(didx, row.getValue(sidx));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -88,7 +91,7 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
List<Row> results = new ArrayList<>();

for (Row row : rows) {
Expand All @@ -104,10 +107,12 @@ public List<Row> execute(List<Row> rows, ExecutorContext context) throws Directi
}

if (!(object instanceof Row)) {
throw new DirectiveExecutionException(
NAME,
String.format("Column '%s' has invalid type '%s'. It should be of type 'Record'.",
column, object.getClass().getSimpleName()));
String errorMessage = String.format(
"Column '%s' has invalid type '%s'. It should be of type 'Record'.", column,
object.getClass().getSimpleName());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(NAME, errorMessage));
}

Row rowColumns = (Row) object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
List<Row> results = new ArrayList<>();
for (Row row : rows) {
int idx1 = row.find(col1);
Expand Down
16 changes: 14 additions & 2 deletions wrangler-core/src/main/java/io/cdap/directives/column/Rename.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -71,9 +74,18 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
ColumnConverter.rename(NAME, row, source.value(), target.value());
try {
ColumnConverter.rename(NAME, row, source.value(), target.value());
} catch (DirectiveExecutionException e) {
String errorMessage = String.format(
"Error renaming column '%s' to '%s' in row '%s'. %s: %s", source.value(),
target.value(), row, e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveExecutionException(NAME, errorMessage, e));
}
}
return rows;
}
Expand Down
57 changes: 45 additions & 12 deletions wrangler-core/src/main/java/io/cdap/directives/column/SetType.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.data.schema.Schema.LogicalType;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand Down Expand Up @@ -77,26 +80,37 @@ public UsageDefinition define() {
}

@Override
public void initialize(Arguments args) throws DirectiveParseException {
public void initialize(Arguments args) {
col = ((ColumnName) args.value("column")).value();
type = ((Identifier) args.value("type")).value();
if (type.equalsIgnoreCase("decimal")) {
precision = args.contains("precision") ? (Integer) ((HashMap<String, Numeric>) args.
value("precision").value()).get("precision").value().intValue() : null;
if (precision != null && precision < 1) {
throw new DirectiveParseException("precision cannot be less than 1");
String errorMessage = "Precision cannot be less than 1";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, null);
}
scale = args.contains("scale") ? ((Numeric) args.value("scale")).value().intValue() : null;
if (scale == null && precision == null && args.contains("rounding-mode")) {
throw new DirectiveParseException("'rounding-mode' can only be specified when a 'scale' or 'precision' is set");
String errorMessage = "Rounding-mode can only be specified when a 'scale' or "
+ "'precision' is set";
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, null);
}
try {
roundingMode = args.contains("rounding-mode") ?
RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) :
(scale == null && precision == null ? RoundingMode.UNNECESSARY : RoundingMode.HALF_EVEN);
} catch (IllegalArgumentException e) {
throw new DirectiveParseException(String.format(
"Specified rounding-mode '%s' is not a valid Java rounding mode", args.value("rounding-mode").value()), e);
String errorMessage = String.format(
"Specified rounding-mode '%s' is not a valid Java rounding mode",
args.value("rounding-mode").value());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, new DirectiveParseException(NAME, errorMessage, e));
}
}
}
Expand All @@ -107,9 +121,17 @@ public void destroy() {
}

@Override
public List<Row> execute(List<Row> rows, ExecutorContext context) throws DirectiveExecutionException {
public List<Row> execute(List<Row> rows, ExecutorContext context) {
for (Row row : rows) {
ColumnConverter.convertType(NAME, row, col, type, scale, precision, roundingMode);
try {
ColumnConverter.convertType(NAME, row, col, type, scale, precision, roundingMode);
} catch (DirectiveExecutionException e) {
String errorMessage = String.format("Error converting column '%s' to type '%s', %s: %s",
col, type, e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage, errorMessage,
ErrorType.USER, false, e);
}
}
return rows;
}
Expand Down Expand Up @@ -144,20 +166,26 @@ public Schema getOutputSchema(SchemaResolutionContext context) {
outputPrecision = inputSchemaPrecision;
} else if (scale == null && inputSchemaScale != null) {
if (precision - inputSchemaScale < 1) {
throw new DirectiveParseException(String.format(
String errorMessage = String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", inputSchemaScale,
precision));
precision);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
errorMessage, ErrorType.USER, false, null);
}
outputScale = inputSchemaScale;
outputPrecision = precision;

} else if (precision == null && inputSchemaPrecision != null) {
if (inputSchemaPrecision - scale < 1) {
throw new DirectiveParseException(String.format(
String errorMessage = String.format(
"Cannot set scale as '%s' and precision as '%s' when "
+ "given precision - scale is less than 1 ", scale,
inputSchemaPrecision));
inputSchemaPrecision);
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
errorMessage, ErrorType.USER, false, null);
}
outputScale = scale;
outputPrecision = inputSchemaPrecision;
Expand All @@ -167,7 +195,12 @@ public Schema getOutputSchema(SchemaResolutionContext context) {
}
return field;
} catch (DirectiveParseException e) {
throw new RuntimeException(e);
String errorMessage = String.format(
"Error converting column '%s' to type '%s', %s: %s", col, type,
e.getClass().getName(), e.getMessage());
throw ErrorUtils.getProgramFailureException(
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorMessage,
errorMessage, ErrorType.USER, false, e);
}
}
)
Expand Down
Loading
Loading