Skip to content

Commit

Permalink
[INLONG-9972][Sort] Pulsar connector support authentication when conn…
Browse files Browse the repository at this point in the history
…ecting to Pulsar cluster (#9973)
  • Loading branch information
EMsnap authored Apr 15, 2024
1 parent bf8d1f7 commit b1de06b
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
startupMode.getValue(),
primaryKey,
pulsarSource.getSubscription(),
scanStartupSubStartOffset);
scanStartupSubStartOffset,
"",
"");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric, Meta
@JsonProperty("scanStartupSubStartOffset")
private String scanStartupSubStartOffset;

/**
* pulsar client auth plugin class name
* e.g. org.apache.pulsar.client.impl.auth.AuthenticationToken
*/
@JsonProperty("clientAuthPluginClassName")
private String clientAuthPluginClassName;

/**
* pulsar client auth params
* e.g. token:{tokenString}
* the tokenString should be compatible with the clientAuthPluginClassName see also in:
* <a href="https://pulsar.apache.org/docs/next/security-jwt/"> pulsar auth </a>
*/
@JsonProperty("clientAuthParams")
private String clientAuthParams;

@JsonCreator
public PulsarExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -86,7 +102,10 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Nonnull @JsonProperty("scanStartupMode") String scanStartupMode,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("scanStartupSubName") String scanStartupSubName,
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset) {
@JsonProperty("scanStartupSubStartOffset") String scanStartupSubStartOffset,
@JsonProperty("clientAuthPluginClassName") String clientAuthPluginClassName,
@JsonProperty("clientAuthParams") String clientAuthParams) {

super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "pulsar topic is null.");
this.serviceUrl = Preconditions.checkNotNull(serviceUrl, "pulsar serviceUrl is null.");
Expand All @@ -97,6 +116,9 @@ public PulsarExtractNode(@JsonProperty("id") String id,
this.primaryKey = primaryKey;
this.scanStartupSubName = scanStartupSubName;
this.scanStartupSubStartOffset = scanStartupSubStartOffset;
this.clientAuthPluginClassName = clientAuthPluginClassName;
this.clientAuthParams = clientAuthParams;

}

/**
Expand All @@ -107,23 +129,28 @@ public PulsarExtractNode(@JsonProperty("id") String id,
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
if (StringUtils.isEmpty(this.primaryKey)) {
if (StringUtils.isBlank(this.primaryKey)) {
options.put("connector", "pulsar-inlong");
options.putAll(format.generateOptions(false));
} else {
options.put("connector", "upsert-pulsar-inlong");
options.putAll(format.generateOptions(true));
}
if (adminUrl != null) {
if (StringUtils.isNotBlank(adminUrl)) {
options.put("admin-url", adminUrl);
}
options.put("service-url", serviceUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);
if (scanStartupSubName != null) {
if (StringUtils.isNotBlank(scanStartupSubName)) {
options.put("scan.startup.sub-name", scanStartupSubName);
options.put("scan.startup.sub-start-offset", scanStartupSubStartOffset);
}
if (StringUtils.isNotBlank(clientAuthPluginClassName)
&& StringUtils.isNotBlank(clientAuthParams)) {
options.put("pulsar.client.authPluginClassName", clientAuthPluginClassName);
options.put("pulsar.client.authParams", clientAuthParams);
}
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public Node getTestObject() {
"earliest",
null,
"subscription",
"earliest");
"earliest",
"org.apache.pulsar.client.impl.auth.AuthenticationToken",
"token auth params");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public PulsarExtractNode buildPulsarExtractNode() {
"earliest",
null,
"test",
"earliest");
"earliest",
"org.apache.pulsar.client.impl.auth.AuthenticationToken",
"token auth params");
}

private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
Expand Down
5 changes: 5 additions & 0 deletions inlong-sort/sort-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@
<artifactId>flink-sql-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,18 @@ public final class Constants {
.withDescription(
"Inner format");

public static final ConfigOption<String> PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME =
ConfigOptions.key("pulsar.client.authPluginClassName")
.stringType()
.noDefaultValue()
.withDescription(
"pulsar client auth plugin class name");

public static final ConfigOption<String> PULSAR_AUTH_PARAMS =
ConfigOptions.key("pulsar.client.authParams")
.stringType()
.noDefaultValue()
.withDescription(
"pulsar client auth params");

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.PULSAR_AUTH_PARAMS;
import static org.apache.inlong.sort.base.Constants.PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME;

/**
* Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
Expand Down Expand Up @@ -333,6 +335,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(AUDIT_KEYS);
options.add(PULSAR_AUTH_PARAMS);
options.add(PULSAR_CLIENT_AUTH_PLUGIN_CLASSNAME);

return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.LinkedHashMap;
Expand All @@ -55,6 +57,8 @@ public class PulsarTableSource implements ScanTableSource, SupportsReadingMetada
// Format attributes
// --------------------------------------------------------------------------------------------

private static final Logger LOG = LoggerFactory.getLogger(PulsarTableSource.class);

private static final String FORMAT_METADATA_PREFIX = "value.";

private final PulsarTableDeserializationSchemaFactory deserializationSchemaFactory;
Expand Down Expand Up @@ -111,6 +115,7 @@ public ChangelogMode getChangelogMode() {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
PulsarDeserializationSchema<RowData> deserializationSchema =
deserializationSchemaFactory.createPulsarDeserialization(context);
LOG.info("pulsar source init with properties: {}", properties);
PulsarSource<RowData> source =
PulsarSource.builder()
.setTopics(topics)
Expand Down

0 comments on commit b1de06b

Please sign in to comment.