diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
index 74b29ee1de..4827cf714d 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -83,10 +83,9 @@
*
See {@link KafkaSourceBuilder} for more details on how to configure this source.
*
* @param the output type of the source.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add a variable metricSchema to report audit information
-@PublicEvolving
public class KafkaSource
implements
Source,
@@ -137,7 +136,6 @@ public Boundedness getBoundedness() {
return this.boundedness;
}
- @Internal
@Override
public SourceReader createReader(SourceReaderContext readerContext)
throws Exception {
@@ -187,7 +185,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
kafkaSourceReaderMetrics);
}
- @Internal
+
@Override
public SplitEnumerator createEnumerator(
SplitEnumeratorContext enumContext) {
@@ -200,7 +198,7 @@ public SplitEnumerator createEnumerat
boundedness);
}
- @Internal
+
@Override
public SplitEnumerator restoreEnumerator(
SplitEnumeratorContext enumContext,
@@ -216,13 +214,13 @@ public SplitEnumerator restoreEnumera
checkpoint);
}
- @Internal
+
@Override
public SimpleVersionedSerializer getSplitSerializer() {
return new KafkaPartitionSplitSerializer();
}
- @Internal
+
@Override
public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() {
return new KafkaSourceEnumStateSerializer();
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
index 1bbfc64b1a..11af4bf7d5 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -86,10 +86,9 @@
*
* Check the Java docs of each individual methods to learn more about the settings to build a
* KafkaSource.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add a variable metricSchema to report audit information
-@PublicEvolving
public class KafkaSourceBuilder {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceBuilder.class);
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
index 2317d66097..738afdb86a 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
@@ -46,10 +46,9 @@
import java.util.concurrent.ConcurrentMap;
/** The source reader for Kafka partitions.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: Add some method to make report audit information exactly once
-@Internal
public class KafkaSourceReader
extends
SingleThreadMultiplexSourceReaderBase, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index b43a4846a9..2f35af5e23 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -36,7 +36,7 @@
import java.util.List;
/** A specific {@link KafkaSerializationSchema} for {@link KafkaDynamicSource}.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
// TODO: support SourceExactlyMetric and add metric collection points
class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
index 0c54cbb558..7ff52be24d 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -32,7 +32,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
/** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptions.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptions.java
index 6eae03ba12..12339967a5 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptions.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptions.java
@@ -34,9 +34,8 @@
/**
* Options for the Kafka connector.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
-@PublicEvolving
public class KafkaConnectorOptions {
// --------------------------------------------------------------------------------------------
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
index 5b2b39ef71..c698ec8c9f 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
@@ -72,9 +72,8 @@
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
/** Utilities for {@link KafkaConnectorOptions}.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
-@Internal
class KafkaConnectorOptionsUtil {
private static final ConfigOption SCHEMA_REGISTRY_SUBJECT =
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
index e08aa15ff0..3a536dcd7c 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
@@ -63,8 +63,8 @@
/**
* A version-agnostic Kafka {@link DynamicTableSink}.
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
-@Internal
public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetadata {
private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index ff089bc668..4419d0b056 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -77,9 +77,8 @@
/**
* A version-agnostic Kafka {@link ScanTableSource}.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * org.apache.flink:flink-connector-kafka:3.2.0
*/
-@Internal
public class KafkaDynamicSource
implements
ScanTableSource,
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 4c428a3bea..9b2a38fb71 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -99,9 +99,8 @@
/**
* Factory for creating configured instances of {@link KafkaDynamicSource} and {@link
* KafkaDynamicSink}.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * org.apache.flink:flink-connector-kafka:3.2.0
*/
-@Internal
public class KafkaDynamicTableFactory
implements
DynamicTableSourceFactory,
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
index 6b468a7435..97f7237632 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
@@ -36,7 +36,7 @@
* The sink provides eventual consistency guarantees under {@link
* org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because the updates are
* idempotent therefore duplicates have no effect.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class ReducingUpsertSink
implements
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
index 741330d802..05b4ad6ebc 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
@@ -41,7 +41,7 @@
import static org.apache.inlong.sort.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
/**
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
class ReducingUpsertWriter
implements
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/SinkBufferFlushMode.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/SinkBufferFlushMode.java
index f8fd4c7d20..c9b211c985 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/SinkBufferFlushMode.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/SinkBufferFlushMode.java
@@ -21,7 +21,7 @@
import java.util.Objects;
/** Sink buffer flush configuration.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
public class SinkBufferFlushMode implements Serializable {
diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index ba3d7005ff..9e59876f16 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -82,7 +82,7 @@
/**
* Upsert-Kafka factory.
- * copied from org.apache.flink:flink-connector-kafka:1.18.0
+ * copied from org.apache.flink:flink-connector-kafka:3.2.0
*/
public class UpsertKafkaDynamicTableFactory
implements