Skip to content

Commit

Permalink
RabbitMQ Mes interface module (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
openi40 committed Jan 17, 2024
1 parent fb1735d commit f24d1d1
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.openi40.mes.integration.ifaces;

public interface IntegrationProtocolTypes {
public static final String MQTT="MQTT";
public static final String OPCUA="OPCUA";
public static final String REST="REST";
public static final String MQTT = "MQTT";
public static final String RABBITMQ = "RABBITMQ";
public static final String AMQP = "AMQP";
public static final String OPCUA = "OPCUA";
public static final String REST = "REST";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.openi40.mes.io</groupId>
<artifactId>openi40-mes-io</artifactId>
<version>00.41.BETA-SNAPSHOT</version>
</parent>
<artifactId>openi40-mes-rabbitmq</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.openi40.mes.io.rabbitmq.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(value = "com.openi40.mes.io.rabbitmq")
public class GenericRabbitMQChannelConfig {
private List<IntegratedChannelsConfig> channels=new ArrayList<IntegratedChannelsConfig>();
public GenericRabbitMQChannelConfig() {

}
public List<IntegratedChannelsConfig> getChannels() {
return channels;
}
public void setChannels(List<IntegratedChannelsConfig> channels) {
this.channels = channels;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.openi40.mes.io.rabbitmq.config;

import com.openi40.mes.integration.ifaces.ManuallyConfiguredIntegrationHandlerImpl;

public class IntegratedChannelsConfig {
private String channelId = null, integrationHandlerId = ManuallyConfiguredIntegrationHandlerImpl.HANDLER_ID;

public IntegratedChannelsConfig() {

}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getIntegrationHandlerId() {
return integrationHandlerId;
}

public void setIntegrationHandlerId(String integrationHandlerId) {
this.integrationHandlerId = integrationHandlerId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.openi40.mes.io.rabbitmq.config;

import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.openi40.mes.integration.ifaces.ConfiguredEndpointInfo;
import com.openi40.mes.io.rabbitmq.input.GenericalRabbitMQInputReceiver;
import com.openi40.mes.io.rabbitmq.input.IAssetsInfoProvider;

@Configuration
public class RabbitMQConfig {
static Logger LOGGER=LoggerFactory.getLogger(RabbitMQConfig.class);
IAssetsInfoProvider assetsInfoProvider = null;
BeanFactory beanFactory = null;

public RabbitMQConfig(@Autowired IAssetsInfoProvider assetsInfoProvider, @Autowired BeanFactory beanFactory) {
this.assetsInfoProvider = assetsInfoProvider;
this.beanFactory = beanFactory;
}

@Bean
public SimpleMessageListenerContainer listenerContainer(final ConnectionFactory connectionFactory) {
LOGGER.info("Start configuring RabbitMQ queues listerer");
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
String[] queueNames = {};
Map<String, ConfiguredEndpointInfo> readAssetsMap = assetsInfoProvider.getReadAssetsMap();
queueNames = new String[readAssetsMap.size()];
Set<String> keys = readAssetsMap.keySet();
LOGGER.info("Listening to queues:"+keys);
queueNames = keys.toArray(new String[0]);
container.setQueueNames(queueNames);
container.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
GenericalRabbitMQInputReceiver receiver = beanFactory.getBean(GenericalRabbitMQInputReceiver.class);
receiver.onMessage(message);
}
});
LOGGER.info("End configuring RabbitMQ queues listerer");
return container;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.openi40.mes.io.rabbitmq.input;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.inject.Singleton;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.openi40.mes.integration.ifaces.ConfiguredEndpointInfo;
import com.openi40.mes.integration.ifaces.IOpenI40IntegratedEndpointsRetriever;
import com.openi40.mes.integration.ifaces.IntegrationHandlerException;
import com.openi40.mes.integration.ifaces.IntegrationProtocolTypes;
import com.openi40.mes.io.rabbitmq.config.GenericRabbitMQChannelConfig;
import com.openi40.mes.io.rabbitmq.config.IntegratedChannelsConfig;

@Singleton
@Service
public class AssetsInfoProviderImpl implements IAssetsInfoProvider {
static Logger LOGGER = LoggerFactory.getLogger(AssetsInfoProviderImpl.class);
GenericRabbitMQChannelConfig channelsConfig = null;
IOpenI40IntegratedEndpointsRetriever configuredEndpointsRetriever = null;
Map<String, ConfiguredEndpointInfo> readmap = new HashMap<String, ConfiguredEndpointInfo>();
Map<String, ConfiguredEndpointInfo> writemap = new HashMap<String, ConfiguredEndpointInfo>();

public AssetsInfoProviderImpl(@Autowired(required = false) GenericRabbitMQChannelConfig channelsConfig,
@Autowired(required = false) IOpenI40IntegratedEndpointsRetriever configuredEndpointsRetriever) {
this.channelsConfig = channelsConfig;
this.configuredEndpointsRetriever = configuredEndpointsRetriever;
}

private synchronized void refreshMaps() {
Map<String, ConfiguredEndpointInfo> rmap = new HashMap<String, ConfiguredEndpointInfo>();
Map<String, ConfiguredEndpointInfo> wmap = new HashMap<String, ConfiguredEndpointInfo>();
if (channelsConfig != null && channelsConfig.getChannels() != null) {
for (IntegratedChannelsConfig channel : channelsConfig.getChannels()) {
if (configuredEndpointsRetriever != null) {
try {
List<ConfiguredEndpointInfo> configured = configuredEndpointsRetriever
.getConfiguredEndpoints(channel.getIntegrationHandlerId(), channel.getChannelId());
if (configured != null) {
for (ConfiguredEndpointInfo configuredEndpointInfo : configured) {
// IntegrationProtocolTypes.
if (configuredEndpointInfo.getEndPointInfo().getProtocolType() != null
&& (configuredEndpointInfo.getEndPointInfo().getProtocolType()
.equalsIgnoreCase(IntegrationProtocolTypes.RABBITMQ)
|| configuredEndpointInfo.getEndPointInfo().getProtocolType()
.equalsIgnoreCase(IntegrationProtocolTypes.AMQP))) {
if (configuredEndpointInfo.getEndPointInfo().getReadUri() != null)
rmap.put(configuredEndpointInfo.getEndPointInfo().getReadUri(),
configuredEndpointInfo);
if (configuredEndpointInfo.getEndPointInfo().getWriteUri() != null)
wmap.put(configuredEndpointInfo.getEndPointInfo().getWriteUri(),
configuredEndpointInfo);
}
}
}
} catch (IntegrationHandlerException e) {
LOGGER.error("Error retrieving endpoints", e);
}
}
}
}
this.writemap = wmap;
this.readmap = rmap;
}

@Override
public Map<String, ConfiguredEndpointInfo> getReadAssetsMap() {
if (readmap == null || readmap.isEmpty()) {
this.refreshMaps();
}
return readmap;
}

@Override
public Map<String, ConfiguredEndpointInfo> getWriteAssetsMap() {
if (writemap == null || writemap.isEmpty())
this.refreshMaps();
return writemap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.openi40.mes.io.rabbitmq.input;

import java.io.Serializable;

import com.openi40.mes.metamessaging.model.AbstractOI40IOTMetaMessage;
import com.openi40.mes.metamessaging.model.VolatileMessageType;
@VolatileMessageType
public class GenericalRabbitMQInputMessage extends AbstractOI40IOTMetaMessage implements Serializable{
private byte[] payload = null;
private String topic = null;
private String channelId = null;
private String integrationId = null;
private String assetCode = null;
private String rabbitMessageId=null;
public GenericalRabbitMQInputMessage() {
// TODO Auto-generated constructor stub
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public String getIntegrationId() {
return integrationId;
}
public void setIntegrationId(String integrationId) {
this.integrationId = integrationId;
}
public String getAssetCode() {
return assetCode;
}
public void setAssetCode(String assetCode) {
this.assetCode = assetCode;
}
public String getRabbitMessageId() {
return rabbitMessageId;
}
public void setRabbitMessageId(String rabbitMessageId) {
this.rabbitMessageId = rabbitMessageId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.openi40.mes.io.rabbitmq.input;

import java.sql.Timestamp;
import java.util.Map;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;

import com.openi40.mes.integration.ifaces.ConfiguredEndpointInfo;
import com.openi40.mes.metamessaging.handlers.IMicroKernel;

@Service
public class GenericalRabbitMQInputReceiver implements MessageListener {
IMicroKernel microKernel = null;
IAssetsInfoProvider assetsInfoProvider = null;

public GenericalRabbitMQInputReceiver(IMicroKernel microKernel, IAssetsInfoProvider assetsInfoProvider) {
this.microKernel = microKernel;
this.assetsInfoProvider = assetsInfoProvider;
}

@Override
public void onMessage(Message message) {
Map<String, ConfiguredEndpointInfo> map = assetsInfoProvider.getReadAssetsMap();
String topic = message.getMessageProperties().getConsumerQueue();
ConfiguredEndpointInfo info = map.get(topic);
String channelId = info != null ? info.getChannelId() : null;
String integrationId = info != null ? info.getHandlerId() : null;
String assetCode = info != null ? info.getAssetCode() : null;
GenericalRabbitMQInputMessage inputMessage = new GenericalRabbitMQInputMessage();
inputMessage.setChannelId(channelId);
inputMessage.setIntegrationId(integrationId);
inputMessage.setTopic(topic);
byte buffer[] = message.getBody();
if (buffer != null) {
inputMessage.setPayload(buffer);
}
String messageId = message.getMessageProperties().getMessageId();
inputMessage.setRabbitMessageId(messageId);
inputMessage.setFrom("rabbitmq::" + channelId + "::" + integrationId + "::topic:" + topic);
inputMessage.setTo(IMicroKernel.MICROKERNEL_ID);
inputMessage.setTimestamp(new Timestamp(System.currentTimeMillis()));
inputMessage.setAssetCode(assetCode);
microKernel.onMessage(inputMessage, null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.openi40.mes.io.rabbitmq.input;

import java.util.Map;

import com.openi40.mes.integration.ifaces.ConfiguredEndpointInfo;

public interface IAssetsInfoProvider {
public Map<String, ConfiguredEndpointInfo> getReadAssetsMap();
public Map<String, ConfiguredEndpointInfo> getWriteAssetsMap();
}
Loading

0 comments on commit f24d1d1

Please sign in to comment.