Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sleep time in unit tests, added timeattribute support note to changelog #20

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased]

- Add support for time attributes
- Support partitioning for special character columns & Fix error in unit test.
- Add LIKE operator pushdown.
- Preserve columns order returned by Elastic driver.
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{ "create" : { "_index" : "test_table_1", "_id" : "1" } }
{ "float_col": 1.1234, "double_col": 2.123456787, "byte_col": 123, "short_col": 12345, "integer_col": 1, "long_col": 123123123, "unsigned_long_col": 123123123, "half_float_col": 12.123, "scaled_float_col": 12.123, "keyword_col": "flink test", "constant_keyword_col": "flink", "wildcard_col": "flink_*", "binary_col": "U29tZSBiaW5hcnkgYmxvYg==", "date_col": "2015-01-01T12:10:30Z", "ip_col": "192.168.1.1", "version_col": "1.2.3", "text_col": "aaa bbb ccc ddd eee ", "boolean_col": true, "text_multifield_col": ["aaa", "bbb", "ccc"]}
{ "float_col": 1.1234, "double_col": 2.123456787, "byte_col": 123, "short_col": 12345, "integer_col": 1, "long_col": 123123123, "unsigned_long_col": 123123123, "half_float_col": 12.123, "scaled_float_col": 12.123, "keyword_col": "flink test", "constant_keyword_col": "flink", "wildcard_col": "flink_*", "binary_col": "U29tZSBiaW5hcnkgYmxvYg==", "date_col": "2015-01-01T12:10:30Z", "date_millis_col": "2015-01-01T12:10:30.123Z", "ip_col": "192.168.1.1", "version_col": "1.2.3", "text_col": "aaa bbb ccc ddd eee ", "boolean_col": true, "text_multifield_col": ["aaa", "bbb", "ccc"]}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
"wildcard_col": { "type": "wildcard" },
"binary_col": { "type": "text" },
"date_col": { "type": "date" },
"date_millis_col": { "type": "date", "format": "date_time" },
"ip_col": { "type": "ip" },
"version_col": { "type": "version" },
"text_col": { "type": "text" },
"text_col": { "type": "text", "fields": { "raw": { "type": "keyword" } } },
"boolean_col": {"type": "boolean"},
"text_multifield_col": {"type": "text", "fields": {"raw": {"type": "keyword"}}}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -101,7 +102,7 @@ private List<String> extractIndexPatterns(Map<String, String> properties) {
// Splitting patterns and removing duplicates

return Arrays.stream(
properties.getOrDefault("properties.index.patterns", "").split(",")
properties.getOrDefault("properties.index.patterns", "").split(",")
)
.map(String::trim)
.filter(e -> !e.isEmpty())
Expand Down Expand Up @@ -152,8 +153,8 @@ private Map<String, TimeAttributeProperties> extractTimeAttributeProperties(Map<
if (!key.startsWith("properties.timeattribute.") ||
!(
key.endsWith(".watermark.column") ||
key.endsWith(".watermark.delay") ||
key.endsWith(".proctime.column")
key.endsWith(".watermark.delay") ||
key.endsWith(".proctime.column")
)
) {
continue;
Expand Down Expand Up @@ -244,15 +245,12 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
getPrimaryKey(metaData, null, getSchemaName(tablePath), getTableName(tablePath));

ResultSetMetaData resultSetMetaData = retrieveResultSetMetaData(conn, tablePath);

Map<String, DataType> columns = retrieveColumns(resultSetMetaData, tablePath);
String[] columnNames = columns.keySet().toArray(new String[0]);
DataType[] types = columns.values().toArray(new DataType[0]);

String tableName = getSchemaTableName(tablePath);
TimeAttributeProperties tableTimeAttributeProperties = timeAttributeProperties.get(tableName);
checkTimeAttributeProperties(tableTimeAttributeProperties, tableName);
Schema tableSchema = buildSchema(columnNames, types, primaryKey, tableTimeAttributeProperties);
Schema tableSchema = buildSchema(columns, primaryKey, tableTimeAttributeProperties);

ScanPartitionProperties properties = scanPartitionProperties.get(tableName);

Expand All @@ -265,12 +263,11 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
deducePartitionNumber(properties, conn, tableName);

checkScanPartitionNumber(properties.getPartitionNumber());
DataType type = retrievePartitionColumnDataType(
columnNames,
types,
properties.getPartitionColumnName(),
tableName
);
DataType type = columns.get(properties.getPartitionColumnName());
if (type == null) {
throw new IllegalArgumentException(format("Partition column was not found in the specified table %s!",
tableName));
}

checkScanPartitionColumnType(type);
calculateScanPartitionBounds(conn, tableName, isColumnTemporal(type), properties);
Expand Down Expand Up @@ -343,15 +340,6 @@ private void checkTimeAttributeProperties(TimeAttributeProperties timeAttributeP
}
}

private DataType retrievePartitionColumnDataType(String[] columnNames, DataType[] types, String partitionColumnName, String tableName) {
for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) {
if (Objects.equals(columnNames[columnIndex], partitionColumnName)) {
return types[columnIndex];
}
}
throw new IllegalArgumentException(format("Partition column was not found in the specified table %s!", tableName));
}

private void checkScanPartitionColumnType(DataType type) {
if (!(isColumnNumeric(type) || isColumnTemporal(type))) {
throw new CatalogException(format("Partition column is of type %s. We support only NUMERIC, DATE and TIMESTAMP partition columns.", type));
Expand All @@ -374,33 +362,66 @@ private Map<String, DataType> retrieveColumns(ResultSetMetaData resultSetMetaDat
}

private ResultSetMetaData retrieveResultSetMetaData(Connection conn, ObjectPath tablePath) throws java.sql.SQLException {
String query = format("SELECT * FROM \"%s\" LIMIT 0", getSchemaTableName(tablePath));
//String query = format("SELECT * FROM \"%s\" LIMIT 0", getSchemaTableName(tablePath));
//String query = format("SHOW COLUMNS FROM \"%s\"", getSchemaTableName(tablePath));
String query = format("SHOW CREATE TABLE \"%s\";", getSchemaTableName(tablePath));// , getSchemaTableName(tablePath));
PreparedStatement ps = conn.prepareStatement(query);
ResultSet rs = ps.executeQuery();

while (rs.next()) {
String currentColumnName = rs.getString(1);
String currentColumnType = rs.getString(2);

System.out.println(currentColumnName + " " + currentColumnType);
}

//int precision = rs.getMetaData().getPrecision(6);
//System.out.println(precision);
return rs.getMetaData();
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private Schema buildSchema(String[] columnNames, DataType[] types, Optional<UniqueConstraint> primaryKey,
private Schema buildSchema(Map<String, DataType> columns, Optional<UniqueConstraint> primaryKey,
TimeAttributeProperties timeAttributeProperties) {
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columns.keySet().toArray(new String[0]),
columns.values().toArray(new DataType[0]));
primaryKey.ifPresent(pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));

if (timeAttributeProperties != null) {
// using proctime
if (timeAttributeProperties.getProctimeColumn() != null) {
schemaBuilder.columnByExpression(timeAttributeProperties.getProctimeColumn(), "PROCTIME()");
} else { // using watermarking
schemaBuilder.watermark(
timeAttributeProperties.getWatermarkColumn(),
timeAttributeProperties.getWatermarkColumn() +
" - INTERVAL " +
timeAttributeProperties.getWatermarkDelay());
String watermarkColumn = timeAttributeProperties.watermarkColumn;
String watermarkDelay = timeAttributeProperties.watermarkDelay;

if (isWatermarkColumnProperType(columns.get(watermarkColumn))) {
schemaBuilder.watermark(
timeAttributeProperties.getWatermarkColumn(),
watermarkColumn + " - INTERVAL " + watermarkDelay
);
} else { // if specified watermark column data type is not supported we add a watermark column to schema
schemaBuilder.columnByExpression("generated_watermark_column", "TO_TIMESTAMP_LTZ(" +
watermarkColumn + ", 3)");
schemaBuilder.watermark(
"generated_watermark_column",
"generated_watermark_column - INTERVAL " + watermarkDelay
);
}
}
}
return schemaBuilder.build();
}

private boolean isWatermarkColumnProperType(DataType watermarkColumnDataType) {
return watermarkColumnDataType.equals(DataTypes.TIMESTAMP(1)) ||
watermarkColumnDataType.equals(DataTypes.TIMESTAMP(2)) ||
watermarkColumnDataType.equals(DataTypes.TIMESTAMP(3)) ||
watermarkColumnDataType.equals(DataTypes.TIMESTAMP_LTZ(1)) ||
watermarkColumnDataType.equals(DataTypes.TIMESTAMP_LTZ(2)) ||
watermarkColumnDataType.equals(DataTypes.TIMESTAMP_LTZ(3));
}

private boolean isColumnNumeric(DataType type) {
return type.equals(DataTypes.TINYINT()) ||
type.equals(DataTypes.SMALLINT()) ||
Expand All @@ -411,7 +432,16 @@ private boolean isColumnNumeric(DataType type) {
}

private boolean isColumnTemporal(DataType type) {
return type.equals(DataTypes.TIMESTAMP()) || type.equals(DataTypes.DATE());
return
type.equals(DataTypes.TIMESTAMP()) ||
type.equals(DataTypes.TIMESTAMP(1)) ||
type.equals(DataTypes.TIMESTAMP(2)) ||
type.equals(DataTypes.TIMESTAMP(3)) ||
type.equals(DataTypes.TIMESTAMP_LTZ(1)) ||
type.equals(DataTypes.TIMESTAMP_LTZ(2)) ||
type.equals(DataTypes.TIMESTAMP_LTZ(3)) ||
type.equals(DataTypes.DATE());
//return type.equals(DataTypes.TIMESTAMP()) || type.equals(DataTypes.DATE());
}

private int calculatePartitionNumberBasedOnPartitionSize(Connection conn, String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Set<ConfigOption<?>> optionalOptions() {
public Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
helper.validateExcept("properties.scan");
helper.validateExcept("properties.scan", "properties.timeattribute");
validateDynamicOptions(context.getOptions());

return new ElasticCatalog(
Expand All @@ -80,13 +80,14 @@ public Catalog createCatalog(Context context) {
}

private void validateDynamicOptions(Map<String, String> options) {
Map<String, String> scanOptions = extractScanOptions(options);
for (Map.Entry<String, String> entry : scanOptions.entrySet()) {
Map<String, String> dynamicOptions = extractDynamicOptions(options);
for (Map.Entry<String, String> entry : dynamicOptions.entrySet()) {
String key = entry.getKey();
if (!(key.startsWith("properties.scan.") && key.endsWith("partition.column.name")) &&
!(key.startsWith("properties.scan.") && key.endsWith("partition.number")) &&
!(key.startsWith("properties.watermark.") && key.endsWith("interval")) &&
!(key.startsWith("properties.watermark.") && key.endsWith("unit"))) {
!(key.startsWith("properties.timeattribute.") && key.endsWith("watermark.column")) &&
!(key.startsWith("properties.timeattribute.") && key.endsWith("watermark.delay")) &&
!(key.startsWith("properties.timeattribute.") && key.endsWith("proctime.column"))) {
throw new IllegalArgumentException("Parameter " + entry.getKey() + " is not supported. We support" +
" properties.scan.<table_name>.partition.column.name, " +
" properties.scan.<table_name>.partition.number, " +
Expand All @@ -99,9 +100,10 @@ private void validateDynamicOptions(Map<String, String> options) {
}
}

private Map<String, String> extractScanOptions(Map<String, String> options) {
private Map<String, String> extractDynamicOptions(Map<String, String> options) {
return options.entrySet().stream()
.filter(entry -> entry.getKey().startsWith("properties.scan"))
.filter(entry -> entry.getKey().startsWith("properties.scan") ||
entry.getKey().startsWith("properties.timeattribute"))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int co
case ELASTIC_BYTE:
return DataTypes.TINYINT();
case ELASTIC_DATETIME:
return DataTypes.TIMESTAMP();
int p = metadata.getPrecision(colIndex);
if (p > 0) {
return DataTypes.TIMESTAMP(p);
}
else {
return DataTypes.TIMESTAMP();
}
case ELASTIC_DOUBLE:
case ELASTIC_SCALED_FLOAT:
return DataTypes.DOUBLE();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,23 @@ public static void beforeAll() throws Exception {
createTestIndex(INPUT_PARTIAL_SCHEMA_TABLE_2, PARTIAL_SCHEMA_PATH_2);
createTestIndex(INPUT_EMPTY_TABLE, INDEX_PATH);
createTestIndex(INPUT_SPECIAL_CHARACTER_COLUMN_NAMES_TABLE, SPECIAL_CHARACTER_COLUMN_NAMES_INDEX_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that one sleep after the last call of addTestData is enough. Could you test that?

addTestData(INPUT_SINGLE_RECORD_TABLE, INPUT_SINGLE_EVENT_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_MULTIPLE_RECORDS_TABLE, INPUT_MULTIPLE_EVENTS_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_MISSING_DATE_COL_TABLE, INPUT_NO_DATE_COL_EVENTS_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_UNSUPPORTED_DATA_TYPE_TABLE, INPUT_UNSUPPORTED_DATA_TYPE_EVENTS_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_PARTIAL_SCHEMA_TABLE_1, INPUT_PARTIAL_SCHEMA_EVENTS_PATH_1);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_PARTIAL_SCHEMA_TABLE_2, INPUT_PARTIAL_SCHEMA_EVENTS_PATH_2);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_EMPTY_TABLE, INPUT_NO_EVENTS_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
addTestData(INPUT_SPECIAL_CHARACTER_COLUMN_NAMES_TABLE, INPUT_SPECIAL_CHARACTER_COLUMN_NAMES_PATH);
TimeUnit.SECONDS.sleep(1);
TimeUnit.SECONDS.sleep(2);
}

@Test
Expand Down Expand Up @@ -452,8 +452,8 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
Schema expectedSchema = Schema.newBuilder().fromFields(
new String[]{"binary_col", "boolean_col", "byte_col",
"constant_keyword_col", "date_col", "date_epoch_col",
"date_nanos_col", "double_col", "float_col",
"half_float_col", "integer_col", "ip_col",
"date_millis_col", "date_nanos_col", "double_col",
"float_col", "half_float_col", "integer_col", "ip_col",
"keyword_col", "long_col", "scaled_float_col",
"short_col", "text_col", "text_multifield_col",
"unsigned_long_col", "version_col", "wildcard_col"},
Expand All @@ -464,6 +464,7 @@ public void testGetTableIndexPattern() throws TableNotExistException, DatabaseNo
DataTypes.STRING(),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP(6),
DataTypes.TIMESTAMP(3),
DataTypes.TIMESTAMP(6),
DataTypes.DOUBLE(),
DataTypes.FLOAT(),
Expand Down Expand Up @@ -676,7 +677,7 @@ public void testGetTableTimeAttributesWatermark() throws TableNotExistException
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_col");
properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_millis_col");
properties.put("properties.timeattribute.test_multiple_records_table.watermark.delay", "'5' SECOND");

// when
Expand All @@ -686,9 +687,30 @@ public void testGetTableTimeAttributesWatermark() throws TableNotExistException

// then
Schema schema = table.getUnresolvedSchema();
assertEquals(1, schema.getWatermarkSpecs().size());
System.out.println(schema);
assertEquals("[date_millis_col - INTERVAL '5' SECOND]", schema.getWatermarkSpecs().get(0).getWatermarkExpression().toString());
}

@Test
public void testGetTableGenerateWatermarkColumn() throws TableNotExistException {
// given
String url = String.format("jdbc:elasticsearch://%s:%d", container.getHost(),
container.getElasticPort());
Map<String, String> properties = new HashMap<String, String>();
properties.put("properties.timeattribute.test_multiple_records_table.watermark.column", "date_col");
properties.put("properties.timeattribute.test_multiple_records_table.watermark.delay", "'5' SECOND");

// when
ElasticCatalog catalog = new ElasticCatalog("test-catalog", "test-database", USERNAME,
PASSWORD, url, properties);
CatalogBaseTable table = catalog.getTable(new ObjectPath("docker-cluster", "test_multiple_records_table"));

// then
Schema schema = table.getUnresolvedSchema();
assertEquals(schema.getColumns().get(schema.getColumns().size() - 1).toString(), "`generated_watermark_column` AS [TO_TIMESTAMP_LTZ(date_col, 3)]");
assertEquals(1, schema.getWatermarkSpecs().size());
assertEquals("[date_col - INTERVAL '5' SECOND]", schema.getWatermarkSpecs().get(0).getWatermarkExpression().toString());
assertEquals("[generated_watermark_column - INTERVAL '5' SECOND]", schema.getWatermarkSpecs().get(0).getWatermarkExpression().toString());
}

@Test
Expand Down
Loading