Handle underlying MQTT and STOMP connections using the Spring Integration adapter
We have some spring integration processes to handle messages arriving via mqtt or stomp. For this we use adapters mqttpahomessagedrivenchanneladapter
and stompinboundchanneladapter
.
In the case of mqtt, we observed that if any endpoint in the stream throws an exception, the adapter closes the connection and no longer receives messages. Likewise, if we restart the proxy, the connection to it will not be established again.
In order to handle exceptions, we set the error channel name to the value handled by spring by default "errorchannel" adapter. Our intention is to only log the exception without closing the underlying connection. Is this the correct way to handle exceptions throughout the process?
Regarding the reconnection issue, we have different methods for each transport protocol.
- For mqtt, we set
automaticreconnect
ofconnectionoptions
totrue
:
var clientfactory = new defaultmqttpahoclientfactory(); clientfactory.getconnectionoptions().setautomaticreconnect(true); var adapter = new mqttpahomessagedrivenchanneladapter("tcp://localhost:1883", mqttasyncclient.generateclientid(), clientfactory, "/topic/mytopic"); adapter.seterrorchannelname("errorchannel");
- For stomp, we set the
taskscheduler
in the context toreactornettytcpstompclient
:
var stompClient = new ReactorNettyTcpStompClient(host, port); stompClient.setTaskScheduler(taskScheduler); var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient); var adapter = new StompInboundChannelAdapter(stompSessionManager, "/queue/myQueue"); adapter.setErrorChannelName("errorChannel");
Is this the best way to handle this problem?
Correct answer
Yes, the errorchannel
option is a good way to suppress exceptions being thrown to mqtt clients. Doesn't have to be global errorchannel
, it may be used in many different places. setautomaticreconnect(true)
Really recommended for inbound channel adapters.
reactornettytcpstompclient
's taskscheduler
does not work with reconnections. See its javadocs. I think the reconnection logic is not used in reactornettytcpstompclient
:
public completablefuture<stompsession> connectasync(@nullable stompheaders connectheaders, stompsessionhandler handler) { connectionhandlingstompsession session = createsession(connectheaders, handler); this.tcpclient.connectasync(session); return session.getsession(); }
Case of reconnection via another variant:
CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy reconnectStrategy);
The above is the detailed content of Handle underlying MQTT and STOMP connections using the Spring Integration adapter. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics









