Skip to content

Commit

Permalink
[INLONG-9077][Sort] Fix TubeMQ connector fail to subscribe streamId (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Oct 20, 2023
1 parent de653f3 commit 0e60e72
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 66 deletions.
2 changes: 1 addition & 1 deletion inlong-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<version>${exec.maven.version}</version>
<configuration>
<executable>${basedir}/script/backup_module_dependencys.sh</executable>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
source.getMasterRpc(),
source.getTopic(),
source.getSerializationType(),
source.getGroupId(),
source.getConsumeGroup(),
source.getSessionKey(),
source.getTid(),
source.getStreamId(),
source.getInnerFormat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TubeMQSource extends StreamSource {
private String topic;

@ApiModelProperty("Group of the TubeMQ")
private String groupId;
private String consumeGroup;

@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
Expand All @@ -61,10 +61,10 @@ public class TubeMQSource extends StreamSource {
private String innerFormat;

/**
* The TubeMQ consumers use this tid set to filter records reading from server.
* The TubeMQ consumers use this streamId set to filter records reading from server.
*/
@ApiModelProperty("Tid of the TubeMQ")
private TreeSet<String> tid;
private TreeSet<String> streamId;

public TubeMQSource() {
this.setSourceType(SourceType.TUBEMQ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
String streamId = streamInfo.getInlongStreamId();
tubeMQSource.setSourceName(streamId);
tubeMQSource.setTopic(groupInfo.getMqResource());
tubeMQSource.setGroupId(streamId);
tubeMQSource.setConsumeGroup(streamId);
tubeMQSource.setMasterRpc(masterRpc);
tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class TubeMQConstant {

public static final String TOPIC = "topic";

public static final String GROUP_ID = "group.id";
public static final String CONSUME_GROUP = "consume.group";

public static final String CONNECTOR = "connector";

Expand All @@ -38,9 +38,9 @@ public class TubeMQConstant {
public static final String SESSION_KEY = "session.key";

/**
* The tubemq consumers use this tid set to filter records reading from server.
* The tubemq consumers use this streamId set to filter records reading from server.
*/
public static final String TID = "tid";
public static final String STREAMID = "stream.id";

public static final String CONSUMER_STARTUP_MODE = "consumer.startup.mode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable {
private String format;

@Nonnull
@JsonProperty("groupId")
private String groupId;
@JsonProperty("consumeGroup")
private String consumeGroup;

@JsonProperty("sessionKey")
private String sessionKey;

/**
* The tubemq consumers use this tid set to filter records reading from server.
* The tubemq consumers use this streamId set to filter records reading from server.
*/
@JsonProperty("tid")
private TreeSet<String> tid;
@JsonProperty("streamId")
private TreeSet<String> streamId;

@JsonProperty("inlong-msg.inner.format")
private String innerFormat;
Expand All @@ -86,18 +86,18 @@ public TubeMQExtractNode(
@Nonnull @JsonProperty("masterRpc") String masterRpc,
@Nonnull @JsonProperty("topic") String topic,
@Nonnull @JsonProperty("format") String format,
@Nonnull @JsonProperty("groupId") String groupId,
@Nonnull @JsonProperty("consumeGroup") String consumeGroup,
@JsonProperty("sessionKey") String sessionKey,
@JsonProperty("tid") TreeSet<String> tid,
@JsonProperty("streamId") TreeSet<String> streamId,
@JsonProperty("inlong-msg.inner.format") String innerFormat) {
super(id, name, fields, waterMarkField, properties);
this.masterRpc = Preconditions.checkNotNull(masterRpc, "TubeMQ masterRpc is null");
this.topic = Preconditions.checkNotNull(topic, "TubeMQ topic is null");
this.format = Preconditions.checkNotNull(format, "Format is null");
this.groupId = Preconditions.checkNotNull(groupId, "Group id is null");
this.consumeGroup = Preconditions.checkNotNull(consumeGroup, "Group id is null");
this.sessionKey = sessionKey;
this.streamId = streamId;
this.innerFormat = innerFormat;
this.tid = tid;
}

@Override
Expand All @@ -106,15 +106,15 @@ public Map<String, String> tableOptions() {
map.put(TubeMQConstant.CONNECTOR, TubeMQConstant.TUBEMQ);
map.put(TubeMQConstant.TOPIC, topic);
map.put(TubeMQConstant.MASTER_RPC, masterRpc);
map.put(TubeMQConstant.GROUP_ID, groupId);
map.put(TubeMQConstant.CONSUME_GROUP, consumeGroup);
map.put(TubeMQConstant.FORMAT, format);
map.put(TubeMQConstant.SESSION_KEY, sessionKey);
if (format.startsWith(INLONG_MSG)) {
map.put(TubeMQConstant.INNER_FORMAT, innerFormat);
}

if (null != tid && !tid.isEmpty()) {
map.put(TubeMQConstant.TID, StringUtils.concatCsv(tid.toArray(new String[0]),
if (null != streamId && !streamId.isEmpty()) {
map.put(TubeMQConstant.STREAMID, StringUtils.concatCsv(streamId.toArray(new String[0]),
',', null, null));
}

Expand Down
6 changes: 6 additions & 0 deletions inlong-sort/sort-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@
<profile>
<id>v1.15</id>
<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-tubemq-v1.15</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-postgres-cdc-v1.15</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@

import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_ID;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TID;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
Expand Down Expand Up @@ -169,8 +169,8 @@ public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FORMAT);
options.add(TOPIC);
options.add(GROUP_ID);
options.add(TID);
options.add(CONSUME_GROUP);
options.add(STREAMID);
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
options.add(TOPIC_PATTERN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ public class TubeMQOptions {
// TubeMQ specific options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<String> INNER_FORMAT =
ConfigOptions.key("inlong-msg.inner.format")
.stringType()
.noDefaultValue()
.withDescription(
"Inner format");

public static final ConfigOption<String> TOPIC =
ConfigOptions.key("topic")
.stringType()
Expand All @@ -109,8 +116,8 @@ public class TubeMQOptions {
.noDefaultValue()
.withDescription("Required TubeMQ master connection string");

public static final ConfigOption<String> GROUP_ID =
ConfigOptions.key("group.id")
public static final ConfigOption<String> CONSUME_GROUP =
ConfigOptions.key("consume.group")
.stringType()
.noDefaultValue()
.withDescription(
Expand Down Expand Up @@ -140,8 +147,8 @@ public class TubeMQOptions {
.defaultValue("default_session_key")
.withDescription("The session key for this consumer group at startup.");

public static final ConfigOption<List<String>> TID =
ConfigOptions.key("topic.tid")
public static final ConfigOption<List<String>> STREAMID =
ConfigOptions.key("stream.id")
.stringType()
.asList()
.noDefaultValue()
Expand Down Expand Up @@ -385,7 +392,7 @@ public static String getMasterRpcAddress(ReadableConfig tableOptions) {

public static TreeSet<String> getTiSet(ReadableConfig tableOptions) {
TreeSet<String> set = new TreeSet<>();
tableOptions.getOptional(TID).ifPresent(new Consumer<List<String>>() {
tableOptions.getOptional(STREAMID).ifPresent(new Consumer<List<String>>() {

@Override
public void accept(List<String> strings) {
Expand All @@ -396,7 +403,7 @@ public void accept(List<String> strings) {
}

public static String getConsumerGroup(ReadableConfig tableOptions) {
return tableOptions.getOptional(GROUP_ID).orElse(null);
return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
}

public static String getSessionKey(ReadableConfig tableOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.maven.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<!-- Currently Flink azure test pipeline would first pre-compile and then upload the compiled
Expand Down Expand Up @@ -157,6 +158,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>${build.helper.maven.version}</version>
<executions>
<execution>
<id>add-test-source</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
private final String topic;

/**
* The tubemq consumers use this tid set to filter records reading from server.
* The tubemq consumers use this streamId set to filter records reading from server.
*/
private final TreeSet<String> tidSet;
private final TreeSet<String> streamIdSet;

/**
* The consumer group name.
Expand Down Expand Up @@ -147,7 +147,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
*
* @param masterAddress the master address of TubeMQ
* @param topic the topic name
* @param tidSet the topic's filter condition items
* @param streamIdSet the topic's filter condition items
* @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
* @param configuration the configure
Expand All @@ -156,22 +156,22 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
public FlinkTubeMQConsumer(
String masterAddress,
String topic,
TreeSet<String> tidSet,
TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
Configuration configuration,
String sessionKey,
Boolean innerFormat) {
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(topic, "The topic must not be null.");
checkNotNull(tidSet, "The tid set must not be null.");
checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(consumerGroup, "The consumer group must not be null.");
checkNotNull(deserializationSchema, "The deserialization schema must not be null.");
checkNotNull(configuration, "The configuration must not be null.");

this.masterAddress = masterAddress;
this.topic = topic;
this.tidSet = tidSet;
this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
this.sessionKey = sessionKey;
Expand Down Expand Up @@ -217,7 +217,7 @@ public void open(Configuration parameters) throws Exception {
final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer.subscribe(topic, tidSet);
messagePullConsumer.subscribe(topic, streamIdSet);
messagePullConsumer.completeSubscribe(sessionKey, numTasks, true, currentOffsets);

running = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements Check
private final String topic;

/**
* The tubemq consumers use this tid set to filter records reading from server.
* The tubemq consumers use this streamId set to filter records reading from server.
*/
private final TreeSet<String> tidSet;
private final TreeSet<String> streamIdSet;
/**
* The serializer for the records sent to tube.
*/
Expand All @@ -86,12 +86,12 @@ public class FlinkTubeMQProducer<T> extends RichSinkFunction<T> implements Check
public FlinkTubeMQProducer(String topic,
String masterAddress,
SerializationSchema<T> serializationSchema,
TreeSet<String> tidSet,
TreeSet<String> streamIdSet,
Configuration configuration) {
checkNotNull(topic, "The topic must not be null.");
checkNotNull(masterAddress, "The master address must not be null.");
checkNotNull(serializationSchema, "The serialization schema must not be null.");
checkNotNull(tidSet, "The tid set must not be null.");
checkNotNull(streamIdSet, "The streamId set must not be null.");
checkNotNull(configuration, "The configuration must not be null.");

int max_retries = configuration.getInteger(TubeMQOptions.MAX_RETRIES);
Expand All @@ -100,7 +100,7 @@ public FlinkTubeMQProducer(String topic,
this.topic = topic;
this.masterAddress = masterAddress;
this.serializationSchema = serializationSchema;
this.tidSet = tidSet;
this.streamIdSet = streamIdSet;
this.maxRetries = max_retries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.GROUP_NAME;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
Expand Down Expand Up @@ -220,7 +220,7 @@ public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FORMAT);
options.add(TOPIC);
options.add(GROUP_NAME);
options.add(CONSUME_GROUP);
options.add(STREAMID);
options.add(SESSION_KEY);
options.add(BOOTSTRAP_FROM_MAX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public class TubeMQOptions {
.noDefaultValue()
.withDescription("Required TubeMQ master connection string");

public static final ConfigOption<String> GROUP_NAME =
ConfigOptions.key("group.name")
public static final ConfigOption<String> CONSUME_GROUP =
ConfigOptions.key("consume.group")
.stringType()
.noDefaultValue()
.withDescription(
Expand Down Expand Up @@ -138,7 +138,7 @@ public class TubeMQOptions {
.withDescription("The session key for this consumer group at startup.");

public static final ConfigOption<List<String>> STREAMID =
ConfigOptions.key("topic.streamId")
ConfigOptions.key("stream.id")
.stringType()
.asList()
.noDefaultValue()
Expand Down Expand Up @@ -275,7 +275,7 @@ public void accept(List<String> strings) {
}

public static String getConsumerGroup(ReadableConfig tableOptions) {
return tableOptions.getOptional(GROUP_NAME).orElse(null);
return tableOptions.getOptional(CONSUME_GROUP).orElse(null);
}

public static String getSessionKey(ReadableConfig tableOptions) {
Expand Down
Loading

0 comments on commit 0e60e72

Please sign in to comment.