From 03e8d7181744b2c87ab82d4414b1056400d8167f Mon Sep 17 00:00:00 2001
From: XiaoYou201 <58425449+XiaoYou201@users.noreply.github.com>
Date: Sun, 7 Apr 2024 14:45:28 +0800
Subject: [PATCH] [INLONG-9913][Sort] Solve end-to-end-tests-v1.13 naming
problem and test case run failed. (#9924)
---
.github/workflows/ci_ut_flink13.yml | 84 +++++++++++++++++++
.../sort-end-to-end-tests-v1.13/pom.xml | 2 +-
...eITCase.java => Mysql2ClickHouseTest.java} | 12 +--
...fkaE2EITCase.java => Mysql2KafkaTest.java} | 16 ++--
.../tests/utils/FlinkContainerTestEnv.java | 2 +-
.../sort/tests/utils/MySqlContainer.java | 7 +-
.../src/test/resources/docker/mysql/my.cnf | 3 +-
.../src/test/resources/docker/mysql/setup.sql | 1 +
.../resources/flinkSql/clickhouse_test.sql | 2 +-
.../test/resources/flinkSql/kafka_test.sql | 3 +-
.../test/resources/groupFile/kafka_test.json | 2 +-
.../src/test/resources/log4j2-test.properties | 2 +-
.../sort-connectors/kafka/pom.xml | 3 +-
13 files changed, 117 insertions(+), 22 deletions(-)
create mode 100644 .github/workflows/ci_ut_flink13.yml
rename inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/{ClickHouseITCase.java => Mysql2ClickHouseTest.java} (92%)
rename inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/{KafkaE2EITCase.java => Mysql2KafkaTest.java} (93%)
diff --git a/.github/workflows/ci_ut_flink13.yml b/.github/workflows/ci_ut_flink13.yml
new file mode 100644
index 00000000000..a02b81c1a7d
--- /dev/null
+++ b/.github/workflows/ci_ut_flink13.yml
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+name:
+ InLong Unit Test For Flink 1.13
+
+on:
+ push:
+ paths:
+ - '.github/workflows/ci_ut_flink13.yml'
+ - 'inlong-sort/**'
+ - '!**.md'
+
+ pull_request:
+ paths:
+ - '.github/workflows/ci_ut_flink13.yml'
+ - 'inlong-sort/**'
+ - '!**.md'
+
+jobs:
+ unit-test:
+ name: Unit Test
+ runs-on: ubuntu-22.04
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+
+ - name: Set up JDK
+ uses: actions/setup-java@v3
+ with:
+ java-version: 8
+ distribution: adopt
+
+ - name: Cache Maven packages
+ uses: actions/cache@v3
+ with:
+ path: |
+ ~/.m2/repository
+ !~/.m2/repository/org/apache/inlong
+ key: ${{ runner.os }}-inlong-flink13-${{ hashFiles('**/pom.xml') }}
+ restore-keys: ${{ runner.os }}-inlong-flink13
+
+ - name: Build for Flink 1.13 with Maven
+ run: mvn --update-snapshots -e -V clean install -U -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
+ env:
+ CI: false
+
+ - name: Unit test for Flink 1.13 with Maven
+ run: mvn --update-snapshots -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13
+ env:
+ CI: false
+
+ - name: Upload unit test results
+ if: ${{ failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: surefire-reports
+ path: ./**/target/surefire-reports/
+ if-no-files-found: ignore
+
+ - name: Upload integration test results
+ if: ${{ failure() }}
+ uses: actions/upload-artifact@v3
+ with:
+ name: failsafe-reports
+ path: ./**/target/failsafe-reports/
+ if-no-files-found: ignore
+
+ - name: Clean up build packages
+ run: mvn clean
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml
index c3d167679dc..2b45fc8a787 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/pom.xml
@@ -182,7 +182,7 @@
copy
- pre-integration-test
+ validate
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/ClickHouseITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2ClickHouseTest.java
similarity index 92%
rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/ClickHouseITCase.java
rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2ClickHouseTest.java
index 0d5e151af39..e7b9f5ed201 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/ClickHouseITCase.java
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2ClickHouseTest.java
@@ -45,19 +45,20 @@
* End-to-end tests
* Test flink sql mysql cdc to clickHouse
*/
-public class ClickHouseITCase extends FlinkContainerTestEnv {
+public class Mysql2ClickHouseTest extends FlinkContainerTestEnv {
- private static final Logger LOG = LoggerFactory.getLogger(ClickHouseITCase.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Mysql2ClickHouseTest.class);
private static final Path jdbcJar = TestUtils.getResource("sort-connector-jdbc.jar");
private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
- private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
+ private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
// Can't use getResource("xxx").getPath(), windows will don't know that path
private static final String sqlFile;
static {
try {
- sqlFile = Paths.get(ClickHouseITCase.class.getResource("/flinkSql/clickhouse_test.sql").toURI()).toString();
+ sqlFile = Paths.get(Mysql2ClickHouseTest.class.getResource("/flinkSql/clickhouse_test.sql").toURI())
+ .toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
@@ -125,7 +126,7 @@ private void initializeMysqlTable() {
*/
@Test
public void testClickHouseUpdateAndDelete() throws Exception {
- submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
+ submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
// generate input
@@ -152,6 +153,7 @@ public void testClickHouseUpdateAndDelete() throws Exception {
CLICK_HOUSE_CONTAINER.getDriverClassName());
List expectResult =
Arrays.asList("2,tom,Big 2-wheel scooter ");
+
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2KafkaTest.java
similarity index 93%
rename from inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
rename to inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2KafkaTest.java
index 10406f6d3a1..8a2903d15e0 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/KafkaE2EITCase.java
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/Mysql2KafkaTest.java
@@ -51,9 +51,9 @@
/**
* End-to-end tests for sort-connector-kafka uber jar.
*/
-public class KafkaE2EITCase extends FlinkContainerTestEnv {
+public class Mysql2KafkaTest extends FlinkContainerTestEnv {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaE2EITCase.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Mysql2KafkaTest.class);
private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-jdbc.jar");
@@ -65,6 +65,7 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(NETWORK)
+ .withExposedPorts(9093, 2181)
.withNetworkAliases("kafka")
.withEmbeddedZookeeper()
.withLogConsumer(new Slf4jLogConsumer(LOG));
@@ -78,7 +79,7 @@ public static void teardown() {
private Path getSql(String fileName, Map properties) {
try {
- Path file = Paths.get(KafkaE2EITCase.class.getResource("/flinkSql/" + fileName).toURI());
+ Path file = Paths.get(Mysql2KafkaTest.class.getResource("/flinkSql/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
@@ -87,7 +88,7 @@ private Path getSql(String fileName, Map properties) {
private Path getGroupFile(String fileName, Map properties) {
try {
- Path file = Paths.get(KafkaE2EITCase.class.getResource("/groupFile/" + fileName).toURI());
+ Path file = Paths.get(Mysql2KafkaTest.class.getResource("/groupFile/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
@@ -96,7 +97,7 @@ private Path getGroupFile(String fileName, Map properties) {
private String getCreateStatement(String fileName, Map properties) {
try {
- Path file = Paths.get(KafkaE2EITCase.class.getResource("/env/" + fileName).toURI());
+ Path file = Paths.get(Mysql2KafkaTest.class.getResource("/env/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(
new String(Files.readAllBytes(file), StandardCharsets.UTF_8),
properties);
@@ -165,7 +166,7 @@ public void testKafkaWithSqlFile() throws Exception {
"INSERT INTO test_input "
+ "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);");
stat.execute(
- "INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+ "INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter',5.18, null, null, null);");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
@@ -177,7 +178,7 @@ public void testKafkaWithSqlFile() throws Exception {
List expectResult =
Arrays.asList(
"1,jacket,water resistent white wind breaker,0.2,,,",
- "2,scooter,Big 2-wheel scooter ,5.18,,,");
+ "2,scooter,Big 2-wheel scooter,5.18,,,");
proxy.checkResultWithTimeout(
expectResult,
"test_output",
@@ -238,6 +239,7 @@ public void testKafkaWithGroupFile() throws Exception {
Arrays.asList(
"1,jacket,water resistent white wind breaker,0.2,null,null,null",
"2,scooter,Big 2-wheel scooter ,5.18,null,null,null");
+
proxy.checkResultWithTimeout(
expectResult,
mysqlOutputTable,
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
index cbe4d9b64ff..be6e7ada42f 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/FlinkContainerTestEnv.java
@@ -115,7 +115,7 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer) new MySqlContainer()
- .withConfigurationOverride("docker/mysql/my.cnf")
+ .withConfigurationOverride("docker/mysql")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("test")
.withUsername("flinkuser")
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
index 57ec270427b..975a416b448 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/java/org/apache/inlong/sort/tests/utils/MySqlContainer.java
@@ -35,7 +35,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
public static final String IMAGE = "mysql";
public static final Integer MYSQL_PORT = 3306;
- private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+ private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "TC_MY_CNF";
private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
private static final String MYSQL_ROOT_USER = "root";
@@ -63,6 +63,11 @@ protected void configure() {
optionallyMapResourceParameterAsVolume(
SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
}
+ if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+ optionallyMapResourceParameterAsVolume(
+ MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/conf.d",
+ "docker/mysql");
+ }
addEnv("MYSQL_DATABASE", databaseName);
addEnv("MYSQL_USER", username);
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf
index 87a492c496a..bbf52cd615c 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/my.cnf
@@ -60,4 +60,5 @@ binlog_format = row
# enable gtid mode
gtid_mode = on
-enforce_gtid_consistency = on
\ No newline at end of file
+enforce_gtid_consistency = on
+default_authentication_plugin=mysql_native_password
\ No newline at end of file
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql
index 9ec4b48bbdb..74d2715e463 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/docker/mysql/setup.sql
@@ -23,3 +23,4 @@
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'inlong' IDENTIFIED BY 'inlong';
GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%';
+FLUSH PRIVILEGES;
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql
index 6074bedbf08..c07a98d51ac 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/clickhouse_test.sql
@@ -9,7 +9,7 @@ CREATE TABLE test_input1 (
'username' = 'inlong',
'password' = 'inlong',
'database-name' = 'test',
- 'table-name' = 'test_input1',
+ 'table-name' = 'test.test_input1',
'scan.incremental.snapshot.chunk.size' = '4',
'scan.incremental.snapshot.enabled' = 'false'
);
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql
index 31142218b31..2d1305cb299 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/flinkSql/kafka_test.sql
@@ -13,7 +13,7 @@ CREATE TABLE test_input (
'username' = 'inlong',
'password' = 'inlong',
'database-name' = 'test',
- 'table-name' = 'test_input',
+ 'table-name' = 'test.test_input',
'append-mode' = 'true',
'scan.incremental.snapshot.chunk.size' = '4',
'scan.incremental.snapshot.enabled' = 'false'
@@ -66,7 +66,6 @@ CREATE TABLE test_output (
'username' = 'inlong',
'password' = 'inlong'
);
-
INSERT INTO kafka_load select * from test_input;
INSERT INTO test_output select * from kafka_extract;
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json
index 3e26cb348c8..1c27b9b33ee 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/groupFile/kafka_test.json
@@ -63,7 +63,7 @@
],
"primaryKey": "id",
"tableNames": [
- "${MYSQL_INPUT_TABLE}"
+ "test.${MYSQL_INPUT_TABLE}"
],
"hostname": "mysql",
"username": "inlong",
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties
index cfafff07e7b..63995e66435 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.13/src/test/resources/log4j2-test.properties
@@ -61,6 +61,6 @@ logger.mysql.name=org.apache.inlong.sort.tests.utils.MySqlContainer
logger.mysql.additivity=false
logger.kafka=INFO, kafkaserver
-logger.kafka.name=org.apache.inlong.sort.tests.KafkaE2EITCase
+logger.kafka.name=org.apache.inlong.sort.tests.Mysql2KafkaTest
logger.kafka.additivity=false
diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/pom.xml
index 9c4a7de9f9a..5650a98fc57 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/kafka/pom.xml
@@ -75,7 +75,8 @@
org.apache.inlong:*
org.apache.kafka:*
com.google.protobuf:*
- org.apache.flink:flink-connector-kafka_${scala.binary.version}
+ org.apache.flink:flink-connector-kafka_${flink.scala.binary.version}
+ org.apache.flink:flink-connector-base
com.amazonaws:*
com.fasterxml.jackson.core:*
commons-logging:commons-logging