通過Docker啟動Solace,並在Spring Boot通過JMS整合Solace

2023-01-18 09:00:30

1 簡介

Solace是一個強大的實時性的事件驅動訊息佇列。本文將介紹如何在Spring中使用,雖然程式碼使用的是Spring Boot,但並沒有使用相關starter,跟Spring的整合一樣,可通用。JMS是通過的訊息處理框架,可以深入學習一下,不同的MQ在JMS的整合上都是類似的。

2 通過Docker啟動Solace

有兩種方式試用Solace,一種是通過Docker來啟動,另一種是使用Cloud版本,但Cloud版本有試用期限,我們使用Docker來啟動吧。

先下載映象:

$ docker pull solace/solace-pubsub-standard:9.13.0.16

然後通過以下命令啟動:

$ docker run -d -p 8080:8080 -p 55554:55555 -p 8008:8008 -p 1883:1883 -p 8000:8000 -p 5672:5672 -p 9000:9000 -p 2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard:9.13.0.16

這裡埠改為55554,是因為Mac的原因。

然後便可以存取來登陸管理介面:http://localhost:8080/

使用者名稱密碼為:admin/admin

登陸後可以看到如下介面,Solace按VPN來管理佇列,VPN有點像分組,比如某個業務線使用某個VPN。

我們在default的VPN上建立一個Queue,名為pkslow-queue

其它設定如下:

接著在該Queue上建立Topic:

建立完成後,我們可以直接測試一下:

可以Publish到Topic或Queue,也可以從其中一個Subscribe。

完成以上設定後,我們就可以在Spring Boot中整合了。

3 Spring Boot JMS整合Solace

3.1 傳送訊息

我們是通過JmsTemplate來傳送訊息的,而JmsTemplate需要連線到MQ,就需要一個ConnectionFactory,這個Factory是帶著MQ的一些連線資訊。設定程式碼如下:

@Configuration
public class SolacePubConfig {

    private final SolaceProperties solaceProperties;

    public SolacePubConfig(SolaceProperties solaceProperties) {
        this.solaceProperties = solaceProperties;
    }

    @Bean("connectionFactory")
    public ConnectionFactory connectionFactory() throws Exception {
        Properties env = new Properties();
        env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory");
        env.put(InitialContext.PROVIDER_URL, solaceProperties.getBrokerUrl());
        env.put(SupportedProperty.SOLACE_JMS_VPN, solaceProperties.getVpn());
        env.put(InitialContext.SECURITY_PRINCIPAL, solaceProperties.getUsername());
        env.put(InitialContext.SECURITY_CREDENTIALS, solaceProperties.getPassword());
        return SolJmsUtility.createConnectionFactory(env);
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        cachingConnectionFactory.setSessionCacheSize(10);
        return cachingConnectionFactory;
    }

    @Bean
    public JmsTemplate pubJmsTemplate(CachingConnectionFactory cachingConnectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setPubSubDomain(true);
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setDeliveryPersistent(true);
        jmsTemplate.setDefaultDestinationName(solaceProperties.getDefaultPubDestinationName());
        return jmsTemplate;
    }

}

生成JmsTemplate後,就可以參照並行送訊息了:

@RestController
@RequestMapping("/solace")
public class SolaceTestController {
    private final JmsTemplate pubJmsTemplate;
    private final SolaceProperties solaceProperties;

    public SolaceTestController(JmsTemplate pubJmsTemplate, SolaceProperties solaceProperties) {
        this.pubJmsTemplate = pubJmsTemplate;
        this.solaceProperties = solaceProperties;
    }

    @GetMapping
    public String send() {
        pubJmsTemplate.send(solaceProperties.getDefaultPubDestinationName(), session -> session.createTextMessage("www.pkslow.com"));
        pubJmsTemplate.send(session -> session.createTextMessage("Larry Deng"));
        return "OK";
    }
}

用到的屬性設定如下:

server.port=8083

pkslow.solace.brokerUrl=smf://127.0.0.1:55554
pkslow.solace.vpn=default
pkslow.solace.username=default
pkslow.solace.password=default
pkslow.solace.defaultPubDestinationName=pkslow-topic
pkslow.solace.defaultSubDestinationName=pkslow-queue
@Configuration
@ConfigurationProperties(prefix = "pkslow.solace")
@Setter
@Getter
public class SolaceProperties {
    private String brokerUrl;
    private String vpn;
    private String username;
    private String password;
    private String defaultPubDestinationName;
    private String defaultSubDestinationName;
}

3.2 接收訊息

我們通過MessageListenerContainer來接收訊息,MessageListenerContainer也需要一個ConnectionFactory,也有MQ的連線資訊。還需要一個MessageListener,用來定義如何處理訊息。我們的設定如下:

@Configuration
@Slf4j
public class SolaceSubConfig {
    private final SolaceProperties solaceProperties;

    public SolaceSubConfig(SolaceProperties solaceProperties) {
        this.solaceProperties = solaceProperties;
    }

    @Bean
    public SingleConnectionFactory singleConnectionFactory(@Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory) {
        return new SingleConnectionFactory(targetConnectionFactory);
    }

    @Bean
    public MessageListener messageListener() {
        return message -> {
            try {
                log.info("Received message " + ((TextMessage) message).getText() + " on destination: " +
                        message.getJMSDestination().toString());
            } catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        };
    }


    @Bean
    public MessageListenerContainer messageListenerContainer(SingleConnectionFactory singleConnectionFactory, MessageListener messageListener) {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(singleConnectionFactory);
        container.setDestinationName(solaceProperties.getDefaultSubDestinationName());
        container.setMessageListener(messageListener);

        return container;
    }
}

這裡@Qualifier("connectionFactory") ConnectionFactory targetConnectionFactory複用了在SolacePubConfig建立的物件。

3.3 測試

傳送GET請求就可以觸發傳送了:

GET http://localhost:8083/solace

我發了三次,結果紀錄檔如下:

4 程式碼

程式碼請看GitHub: https://github.com/LarryDpk/pkslow-samples


References:

Docker available image tags

Docker Solace Guide

Spring Solace