Skip to content

Commit

Permalink
[INLONG-9968][Manager] Support pulsar multi cluster when creating pul…
Browse files Browse the repository at this point in the history
…sar consumption groups (#9970)
  • Loading branch information
fuweng11 authored Apr 10, 2024
1 parent dba5900 commit bff9c2c
Showing 1 changed file with 21 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,28 @@ private void createPulsarSubscription(InlongConsumeEntity entity) {
"mq resource cannot empty for groupId=" + groupId);

String clusterTag = groupEntity.getInlongClusterTag();
ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
String tenant = pulsarDTO.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
}
PulsarTopicInfo topicMessage = new PulsarTopicInfo();
topicMessage.setPulsarTenant(tenant);
topicMessage.setNamespace(mqResource);

List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));

this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException(
"failed to create pulsar topic for groupId=" + groupId + ", reason: "
+ e.getMessage());
}
PulsarTopicInfo topicMessage = new PulsarTopicInfo();
topicMessage.setPulsarTenant(tenant);
topicMessage.setNamespace(mqResource);

List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
this.createPulsarSubscription(pulsarCluster, entity.getConsumerGroup(), topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
+ e.getMessage());
}
}

Expand Down

0 comments on commit bff9c2c

Please sign in to comment.