Skip to content

Commit

Permalink
[INLONG-9913][Sort] Solve end-to-end-tests-v1.13 naming problem and t…
Browse files Browse the repository at this point in the history
…est case run failed. (#9924)
  • Loading branch information
XiaoYou201 authored Apr 7, 2024
1 parent b4f099a commit 03e8d71
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 22 deletions.
84 changes: 84 additions & 0 deletions .github/workflows/ci_ut_flink13.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name:
InLong Unit Test For Flink 1.13

on:
push:
paths:
- '.github/workflows/ci_ut_flink13.yml'
- 'inlong-sort/**'
- '!**.md'

pull_request:
paths:
- '.github/workflows/ci_ut_flink13.yml'
- 'inlong-sort/**'
- '!**.md'

jobs:
unit-test:
name: Unit Test
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Set up JDK
uses: actions/setup-java@v3
with:
java-version: 8
distribution: adopt

- name: Cache Maven packages
uses: actions/cache@v3
with:
path: |
~/.m2/repository
!~/.m2/repository/org/apache/inlong
key: ${{ runner.os }}-inlong-flink13-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-inlong-flink13

- name: Build for Flink 1.13 with Maven
run: mvn --update-snapshots -e -V clean install -U -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13 -DskipTests -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Daether.connector.http.reuseConnections=false -Daether.connector.requestTimeout=60000
env:
CI: false

- name: Unit test for Flink 1.13 with Maven
run: mvn --update-snapshots -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.13 -am -Pv1.13
env:
CI: false

- name: Upload unit test results
if: ${{ failure() }}
uses: actions/upload-artifact@v3
with:
name: surefire-reports
path: ./**/target/surefire-reports/
if-no-files-found: ignore

- name: Upload integration test results
if: ${{ failure() }}
uses: actions/upload-artifact@v3
with:
name: failsafe-reports
path: ./**/target/failsafe-reports/
if-no-files-found: ignore

- name: Clean up build packages
run: mvn clean
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
<goals>
<goal>copy</goal>
</goals>
<phase>pre-integration-test</phase>
<phase>validate</phase>
</execution>
</executions>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@
* End-to-end tests
* Test flink sql mysql cdc to clickHouse
*/
public class ClickHouseITCase extends FlinkContainerTestEnv {
public class Mysql2ClickHouseTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(ClickHouseITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(Mysql2ClickHouseTest.class);

private static final Path jdbcJar = TestUtils.getResource("sort-connector-jdbc.jar");
private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
// Can't use getResource("xxx").getPath(), windows will don't know that path
private static final String sqlFile;

static {
try {
sqlFile = Paths.get(ClickHouseITCase.class.getResource("/flinkSql/clickhouse_test.sql").toURI()).toString();
sqlFile = Paths.get(Mysql2ClickHouseTest.class.getResource("/flinkSql/clickhouse_test.sql").toURI())
.toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -125,7 +126,7 @@ private void initializeMysqlTable() {
*/
@Test
public void testClickHouseUpdateAndDelete() throws Exception {
submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlJdbcJar);
submitSQLJob(sqlFile, jdbcJar, mysqlJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));

// generate input
Expand All @@ -152,6 +153,7 @@ public void testClickHouseUpdateAndDelete() throws Exception {
CLICK_HOUSE_CONTAINER.getDriverClassName());
List<String> expectResult =
Arrays.asList("2,tom,Big 2-wheel scooter ");

proxy.checkResultWithTimeout(
expectResult,
"test_output1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
/**
* End-to-end tests for sort-connector-kafka uber jar.
*/
public class KafkaE2EITCase extends FlinkContainerTestEnv {
public class Mysql2KafkaTest extends FlinkContainerTestEnv {

private static final Logger LOG = LoggerFactory.getLogger(KafkaE2EITCase.class);
private static final Logger LOG = LoggerFactory.getLogger(Mysql2KafkaTest.class);

private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar");
private static final Path jdbcJar = TestUtils.getResource("sort-connector-jdbc.jar");
Expand All @@ -65,6 +65,7 @@ public class KafkaE2EITCase extends FlinkContainerTestEnv {
public static final KafkaContainer KAFKA =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(NETWORK)
.withExposedPorts(9093, 2181)
.withNetworkAliases("kafka")
.withEmbeddedZookeeper()
.withLogConsumer(new Slf4jLogConsumer(LOG));
Expand All @@ -78,7 +79,7 @@ public static void teardown() {

private Path getSql(String fileName, Map<String, Object> properties) {
try {
Path file = Paths.get(KafkaE2EITCase.class.getResource("/flinkSql/" + fileName).toURI());
Path file = Paths.get(Mysql2KafkaTest.class.getResource("/flinkSql/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
Expand All @@ -87,7 +88,7 @@ private Path getSql(String fileName, Map<String, Object> properties) {

private Path getGroupFile(String fileName, Map<String, Object> properties) {
try {
Path file = Paths.get(KafkaE2EITCase.class.getResource("/groupFile/" + fileName).toURI());
Path file = Paths.get(Mysql2KafkaTest.class.getResource("/groupFile/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
Expand All @@ -96,7 +97,7 @@ private Path getGroupFile(String fileName, Map<String, Object> properties) {

private String getCreateStatement(String fileName, Map<String, Object> properties) {
try {
Path file = Paths.get(KafkaE2EITCase.class.getResource("/env/" + fileName).toURI());
Path file = Paths.get(Mysql2KafkaTest.class.getResource("/env/" + fileName).toURI());
return PlaceholderResolver.getDefaultResolver().resolveByMap(
new String(Files.readAllBytes(file), StandardCharsets.UTF_8),
properties);
Expand Down Expand Up @@ -165,7 +166,7 @@ public void testKafkaWithSqlFile() throws Exception {
"INSERT INTO test_input "
+ "VALUES (1,'jacket','water resistent white wind breaker',0.2, null, null, null);");
stat.execute(
"INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
"INSERT INTO test_input VALUES (2,'scooter','Big 2-wheel scooter',5.18, null, null, null);");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
Expand All @@ -177,7 +178,7 @@ public void testKafkaWithSqlFile() throws Exception {
List<String> expectResult =
Arrays.asList(
"1,jacket,water resistent white wind breaker,0.2,,,",
"2,scooter,Big 2-wheel scooter ,5.18,,,");
"2,scooter,Big 2-wheel scooter,5.18,,,");
proxy.checkResultWithTimeout(
expectResult,
"test_output",
Expand Down Expand Up @@ -238,6 +239,7 @@ public void testKafkaWithGroupFile() throws Exception {
Arrays.asList(
"1,jacket,water resistent white wind breaker,0.2,null,null,null",
"2,scooter,Big 2-wheel scooter ,5.18,null,null,null");

proxy.checkResultWithTimeout(
expectResult,
mysqlOutputTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public abstract class FlinkContainerTestEnv extends TestLogger {
@ClassRule
public static final MySqlContainer MYSQL =
(MySqlContainer) new MySqlContainer()
.withConfigurationOverride("docker/mysql/my.cnf")
.withConfigurationOverride("docker/mysql")
.withSetupSQL("docker/mysql/setup.sql")
.withDatabaseName("test")
.withUsername("flinkuser")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MySqlContainer extends JdbcDatabaseContainer {
public static final String IMAGE = "mysql";
public static final Integer MYSQL_PORT = 3306;

private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "TC_MY_CNF";
private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
private static final String MYSQL_ROOT_USER = "root";

Expand Down Expand Up @@ -63,6 +63,11 @@ protected void configure() {
optionallyMapResourceParameterAsVolume(
SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/", "N/A");
}
if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
optionallyMapResourceParameterAsVolume(
MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/conf.d",
"docker/mysql");
}

addEnv("MYSQL_DATABASE", databaseName);
addEnv("MYSQL_USER", username);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,5 @@ binlog_format = row

# enable gtid mode
gtid_mode = on
enforce_gtid_consistency = on
enforce_gtid_consistency = on
default_authentication_plugin=mysql_native_password
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'inlong' IDENTIFIED BY 'inlong';
GRANT ALL PRIVILEGES ON *.* TO 'inlong'@'%';
FLUSH PRIVILEGES;
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CREATE TABLE test_input1 (
'username' = 'inlong',
'password' = 'inlong',
'database-name' = 'test',
'table-name' = 'test_input1',
'table-name' = 'test.test_input1',
'scan.incremental.snapshot.chunk.size' = '4',
'scan.incremental.snapshot.enabled' = 'false'
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ CREATE TABLE test_input (
'username' = 'inlong',
'password' = 'inlong',
'database-name' = 'test',
'table-name' = 'test_input',
'table-name' = 'test.test_input',
'append-mode' = 'true',
'scan.incremental.snapshot.chunk.size' = '4',
'scan.incremental.snapshot.enabled' = 'false'
Expand Down Expand Up @@ -66,7 +66,6 @@ CREATE TABLE test_output (
'username' = 'inlong',
'password' = 'inlong'
);

INSERT INTO kafka_load select * from test_input;
INSERT INTO test_output select * from kafka_extract;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
],
"primaryKey": "id",
"tableNames": [
"${MYSQL_INPUT_TABLE}"
"test.${MYSQL_INPUT_TABLE}"
],
"hostname": "mysql",
"username": "inlong",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ logger.mysql.name=org.apache.inlong.sort.tests.utils.MySqlContainer
logger.mysql.additivity=false

logger.kafka=INFO, kafkaserver
logger.kafka.name=org.apache.inlong.sort.tests.KafkaE2EITCase
logger.kafka.name=org.apache.inlong.sort.tests.Mysql2KafkaTest
logger.kafka.additivity=false

Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
<include>org.apache.inlong:*</include>
<include>org.apache.kafka:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.flink:flink-connector-kafka_${scala.binary.version}</include>
<include>org.apache.flink:flink-connector-kafka_${flink.scala.binary.version}</include>
<include>org.apache.flink:flink-connector-base</include>
<include>com.amazonaws:*</include>
<include>com.fasterxml.jackson.core:*</include>
<include>commons-logging:commons-logging</include>
Expand Down

0 comments on commit 03e8d71

Please sign in to comment.