Spring integration recipient-list-router issue
I am using a recipient-list-router in my application to send message to different JMS outbound adapters as shown here:
queue -> recipient-list-router -> queue1 -> JMS outbound adapter 1
-> queue2 -> JMS outbound adapter 2
I am facing two issues:
- selector-expression runs initially only, not for each message forwarded
- if any JMS broker is down then message is not getting sent to another JMS broker.
Following is the XML configuration:
<i:recipient-list-router input-channel="result-pack-output-channel" >
<i:recipient channel="result-pack-output-channel-1"
selector-expression="#{utils.isHourInInterval('LN')}"/>
<i:recipient channel="result-pack-output-channel-2"
selector-expression="#{utils.isHourInInterval('NY')}"/>
<i:recipient channel="result-pack-output-channel-3"
selector-expression="#{utils.isHourInInterval('HK')}" />
<i:recipient channel="result-pack-output-channel-4"
selector-expression="#{utils.isHourInInterval('ME')}"/>
</i:recipient-list-router>
1 answer
-
answered 2018-07-12 13:06
Gary Russell
#{...}
expressions are evaluated once, during context initialization. Here, you need runtime expressions. In runtime expressions you reference other beans with@
- so...selector-expression="@utils.isHourInInterval('LN')"
EDIT
I missed your second question - use
ignore-send-failures="true">
.
See also questions close to this topic
-
Target bean is not of type of the persistent entity MongoDB Spring
I am trying to fetch Data from MongoDB using spring's query class. in my database, the structure is like following :
{ "_id" : ObjectId("5c6a9ec2f20e70a77384825d"), "millCropRequired" : { "crop" : "RICE", "cropDailyPrice" : "$19.99", "cropQuantity" : 22.0 }, "farmLocation" : { "type" : "Point", "coordinates" : [ -7.0572973, 48.523117 ] } }
I have 1000 entries like this kind in my database. Now, i want to fetch data by crop, so i write following code in Spring :
private Mill getMillsByCrop(GetNearByMillsRequest getNearByMillsRequest) { // find crop by crop Name Query query = new Query(); query.addCriteria(Criteria.where(Mill.Constants.MILL_CROP_REQUIRED).elemMatch(Criteria.where(MillCropRequired.Constants.CROP).is(getNearByMillsRequest.getCrop()))); List<Mill> nearByMills = millDAO.runQuery(query, Mill.class); return nearByMills.get(0); }
it gives following error : Target bean is not of type of the persistent entity.
Edit : constants MILL_CROP_REQUIRED = "millCropRequired", and CROP = "crop". I have check the following query in mongoDB and works.
db.getCollection('Mill').find({"millCropRequired.crop":"RICE"})
Edit : Mill.Class
public class Mill extends AbstractEntity { private String vendorId; private String name; private String emailAddress; private String countryCode; private String phoneNumber; private String postalAddress; @GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE) private GeoJsonPoint millLocation; private String ginningCapacity; private String storageCapacity; private MillCropRequired[] millCropRequired; public String getVendorId() { return vendorId; } public void setVendorId(String vendorId) { this.vendorId = vendorId; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmailAddress() { return emailAddress; } public void setEmailAddress(String emailAddress) { this.emailAddress = emailAddress; } public String getCountryCode() { return countryCode; } public void setCountryCode(String countryCode) { this.countryCode = countryCode; } public String getPhoneNumber() { return phoneNumber; } public void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; } public String getPostalAddress() { return postalAddress; } public void setPostalAddress(String postalAddress) { this.postalAddress = postalAddress; } public GeoJsonPoint getMillLocation() { return millLocation; } public void setMillLocation(GeoJsonPoint millLocation) { this.millLocation = millLocation; } public MillCropRequired[] getMillCropRequired() { return millCropRequired; } public void setMillCropRequired(MillCropRequired[] millCropRequired) { this.millCropRequired = millCropRequired; } public String getGinningCapacity() { return ginningCapacity; } public void setGinningCapacity(String ginningCapacity) { this.ginningCapacity = ginningCapacity; } public MillCropRequired[] getCropRequirnment() { return millCropRequired; } public void setCropRequirnment(MillCropRequired[] millCropRequired) { this.millCropRequired = millCropRequired; } public String getStorageCapacity() { return storageCapacity; } public void setStorageCapacity(String storageCapacity) { this.storageCapacity = storageCapacity; } public static class Constants extends AbstractEntity.Constants { public static final String PHONE_NUMBER = "phoneNumber"; public static final String VENDOR_ID = "vendorId"; public static final String MILL_CROP_REQUIRED = "millCropRequired"; public static final String MILL_LOCATION = "millLocation"; } }
AbstractEntity.class
public class AbstractEntity { @Id private String id; private Long seq; private Long creationTime; private String createdBy; private Long lastUpdated; private String callingUserId; protected EntityState state = EntityState.ACTIVE; public AbstractEntity() { super(); } public EntityState getState() { return state; } public void setState(EntityState state) { this.state = state; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getCreationTime() { return creationTime; } public void setCreationTime(Long creationTime) { this.creationTime = creationTime; } public Long getLastUpdated() { return lastUpdated; } public void setLastUpdated(Long lastUpdated) { this.lastUpdated = lastUpdated; } public String getCreatedBy() { return createdBy; } public void setCreatedBy(String createdBy) { this.createdBy = createdBy; } public String getCallingUserId() { return callingUserId; } public void setCallingUserId(String callingUserId) { this.callingUserId = callingUserId; } public Long getSeq() { return seq; } public void setSeq(Long seq) { this.seq = seq; } public static class Constants { public static final String ID = "id"; public static final String SEQ = "seq"; public static final String CREATION_TIME = "creationTime"; public static final String LAST_UPDATED = "lastUpdated"; public static final String CREATED_BY = "createdBy"; public static final String CALLING_USERID = "callingUserId"; public static final String STATE = "state"; } }
-
Spring boot Json schema validation
Whats the best way to validate Json that is sent into a spring boot application, how easy is it to perform schema validaiton?
I currently have it set up with a custom deserializer, but id like to vlaidate the scehma before it enters the request if thats even possible.
One way would be to validate it inside the deserilizer before i being the process of constructng the object.
-
How to set other properties of ObjectError except for defaultMessage?
My dilemma is the following:
I have a custom validator, and inside it I make the following call
.buildConstraintViolationWithTemplate("exampleString") .addConstraintViolation();
In the error handler part of my application I have the method
@ExceptionHandler(WebExchangeBindException.class) @ResponseBody public ResponseEntity<CustomErrorsView> handleWebExchangeBindException(final WebExchangeBindException ex) { ex.getGlobalErrors() .forEach(x-> System.out.println(x.getDefaultMessage())); ); }
How could I set the other properties of the error, such as
code
? every way I tried setting the information from the validator, all it did was set the defaultMessage. -
Enterprise Integration Specialist Certification discontinued?
I am considering to take the Enterprise Integration Specialist Certification exam but was unable to find it in the official Pivotal/Vmware sites. Any idea if this certificate has been discontinued?
Thanks in advance.
-
Map configuration for kafka producer in properties file fails
I'm trying to add a producer configuration to my project, using Spring Integration and Kafka underneath.
Since I didn't find anywhere to put it, I thought I should add it in my application.properties under:
spring.cloud.stream.kafka.bindings.output.producer.configuration
However, I couldn't compile the code using any the following formats:
spring.cloud.stream.kafka.bindings.output.producer.configuration=request.timeout.ms=100000 spring.cloud.stream.kafka.bindings.output.producer.configuration={'request.timeout.ms':'100000'} spring.cloud.stream.kafka.bindings.output.producer.configuration={"request.timeout.ms":"100000"} spring.cloud.stream.kafka.bindings.output.producer.configuration=request.timeout.ms:100000
I'm always getting this error:
Binding to target org.springframework.cloud.stream.binder.kafka.KafkaExtendedBindingProperties@104392ba failed: Property: spring.cloud.stream.kafka.bindings[output].producer.configuration Value: request.timeout.ms:100000 Reason: Failed to convert property value of type 'java.lang.String' to required type 'java.util.Map' for property 'bindings[output].producer.configuration'; nested exception is java.lang.IllegalStateException: Cannot convert value of type 'java.lang.String' to required type 'java.util.Map' for property 'configuration': no matching editors or conversion strategy found
Any idea how to implement this configuration to the producers?
-
Spring integration - Retry to establish connection on exception scenarios
My application communicates to a third party system using spring integration. I send a payload for which I get a response that I parse and use. All good. Please find below the SI xml that I use.
Now I want to application retry to establish connection on exception scenarios where the server I'm trying to connect isn't available or on time outs or if it refuses to connect etc. How can I achieve this using SI xml configuration? Please guide.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-ip="http://www.springframework.org/schema/integration/ip" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd"> <int:gateway id="gw" service-interface=" com.RxGateway" default-request-channel="objectOut" /> <int:channel id="objectOut" /> <int-ip:tcp-connection-factory id="client" type="client" host="10.236.249.xx" port="9103" single-use="false" so-timeout="50000000" using-nio="false" so-keep-alive="true" serializer="customDSerializer" deserializer="customDSerializer" /> <bean id="customDSerializer" class="com.CustomSerializerDeserializer"> <property name="maxMessageSize" value="4096" /> </bean> <int-ip:tcp-outbound-gateway id="outGateway" request-channel="objectOut" reply-channel="toSA" connection-factory="client" request-timeout="100000" reply-timeout="50000"/> <int:service-activator input-channel="toSA" ref="rxService" method="parseResponse"/> <bean id="rxService" class="com.RxService"/> <int:channel id="toSA" /> <int:channel id="bytesIn" /> </beans>
-
Store the message which is received/sent to the queue using JmsListener
Is there any way to put interceptor in jms listener..requirement is to store the request and response while reading and writing to message queue
@JmsListener(destination = "${ibm.mq.queueName}", containerFactory = "containerFactory") public void readAndProcessMessage(Message<?> message) throws Exception { UnProcessedEvent unProcessedEvent; String eventMessage = message.getPayload().toString(); log.info("Received an event: " + eventMessage); try { String iban = getIban(eventMessage); // Identifying Topic for (Mandate mandate : configProperties.getMandates()) { for (Topic topic : mandate.getTopic()) { if (topic.getAccountNumber().equals(iban)) { publisherService.publishEvent(iban, eventMessage); return; } } } unProcessedEvent = UnProcessedEvent.builder().incomingReqPayload((eventMessage)).reason("No Topic Found") .reasonCode(HttpStatus.BAD_REQUEST.toString()).build(); unprocessedEventRepository.save(unProcessedEvent); } catch (JAXBException e) { log.info("Exception while parsing the event message: " + e.getMessage()); unProcessedEvent = UnProcessedEvent.builder().incomingReqPayload((eventMessage)).reason("Bad Request") .reasonCode(HttpStatus.BAD_REQUEST.toString()).build(); unprocessedEventRepository.save(unProcessedEvent); } }
-
Need help in creating jms template which can connect to IBM websphere MQ via LDAP JNDI lookup?
//I have a standalone JAVA code which can connect to IBMMQ via JNDI lookup, and can send or receive the message. The same thing I want to achieve via spring boot but I am unable to configure the jms template? I am new to spring boot if anyone can help I will appreciate.
//Here is the working JAVA code.
import java.security.Security; import java.util.Hashtable; //Imports for JNDI import javax.jms.Connection; import javax.jms.ConnectionFactory; //Imports for JMS import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsTemplate; import com.ibm.mq.MQException; //Import for MQ Linked Exception import net.bytebuddy.asm.Advice.This; public class Jms11ListenerSample extends Thread implements MessageListener, ExceptionListener{ public void run() { //declaration of objects ConnectionFactory jmsConnFactory = null; Destination jmsDestination = null; MessageConsumer jmsConsumer = null; jmsConnFactory = this.lookupCF(); jmsDestination = this.lookupDestination(); while (running) { try { if (!connected) { System.out.println("connecting to MQ..."); //connect to MQ and start the connection: jmsConnection = jmsConnFactory.createConnection(); jmsConnection.setExceptionListener(this); jmsConnection.start(); jmsSession = jmsConnection.createSession(true, 0); //Session.AUTO_ACKNOWLEDGE/CLIENT_ACKNOWLEDGE/DUPS_OK_ACKNOWLEDGE jmsConsumer = jmsSession.createConsumer(jmsDestination); jmsConsumer.setMessageListener(this); connected = true; } System.out.println("sleeping for " + DEFAULT_RECONNECT_PAUSE/1000 + " seconds ..."); //Wait for onMSG / onExc try {Thread.sleep(DEFAULT_RECONNECT_PAUSE);} catch (Exception exc) {/*ignored*/} } catch (JMSException e) { onException(e); } //end try/catch } //end while if (jmsConnection != null) { try { jmsConnection.close(); //this closes all sub-objects, too! } catch (JMSException e) { System.out.println("JMS Connection could not be closed: " + e); e.printStackTrace(); } } } //END RUN() //This method is called from outside public void onMessage(Message receiveMSG) { try { if (receiveMSG instanceof TextMessage) { System.out.println("Received a message: " + ((TextMessage)receiveMSG).getText()); } else { System.out.println("Received a non-text message"); } jmsSession.commit(); DONE_MSGS++; if (DONE_MSGS >= MAX_MSGS) {running = false;} } catch (JMSException e) { onException(e); } } //This method is called when an exception occurs public void onException(JMSException e) { if (jmsSession != null) {try {jmsSession.rollback();} catch (JMSException exc) {/*ignored*/}} //Print and analyse the error: System.out.println("JMS Exception: " + e); e.printStackTrace(); //The linked exception is vital to errorhandling and cause determination. PRINT IT OUT!!!! MQException le = (MQException)e.getLinkedException(); if (le != null) { int reasoncode = le.reasonCode; System.out.println("LINKED Exception: " + le + " - REASON-CODE: " + reasoncode); if (reasoncode == MQException.MQRC_CONNECTION_BROKEN || reasoncode == MQException.MQRC_Q_MGR_NOT_AVAILABLE) { System.out.println("Will have to reconnect!"); connected=false; if (jmsConnection != null) {try {jmsConnection.close();} catch (JMSException exc) {/*ignored*/}} try {Thread.sleep(DEFAULT_RECONNECT_PAUSE);} catch (Exception exc) {/*ignored*/} } else { running = false; } //The cause is usually empty. In some cases (i.e. SSL) it might be set. PRINT IT OUT!! if (le.getCause() != null) { System.out.println("CAUSE of Linked Exception: " + le.getCause()); } } } //lookupCF() reads the ConnectionFactory from JNDI (LDAP). All threads use the same CF, therefore we do this only once. private ConnectionFactory lookupCF() { //Synchronize so that not multiple threads try to overwrite 'theCF' ! synchronized (this) { if (theCF != null) { return theCF; } InitialContext ctx = null; Hashtable jndiContext = new Hashtable(); jndiContext.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); jndiContext.put(Context.SECURITY_AUTHENTICATION, "none"); jndiContext.put(Context.PROVIDER_URL, jndiLDAPserver); try { ctx = new InitialContext(jndiContext); theCF = (ConnectionFactory) ctx.lookup(jndiCFname); if (theCF == null) { System.out.println("FATAL ERROR: CF " + jndiCFname + " not found!"); System.exit(1); //Quit because fatal error! } ctx.close(); } catch (NamingException e) { e.printStackTrace(); System.exit(1); //Quit because fatal error! } catch(ClassCastException e) { e.printStackTrace(); System.exit(1); //Quit because fatal error! } return theCF; } //END synchonized }//END lookupCF() //lookupDestination() reads the Destination from JNDI (FILE). It is needed only once. private Destination lookupDestination() { Destination destination = null; InitialContext ctx = null; Hashtable jndiContext = new Hashtable(); jndiContext.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); jndiContext.put(Context.PROVIDER_URL, jndiDestinationFile); try { ctx = new InitialContext(jndiContext); destination = (Destination) ctx.lookup(jndiDestinationName); if (destination == null) { System.out.println("FATAL ERROR: Destination " + jndiDestinationName + " not found!"); System.exit(1); //Quit because fatal error } ctx.close(); } catch (NamingException e) { e.printStackTrace(); System.exit(1); //Quit because fatal error } catch(ClassCastException e) { e.printStackTrace(); System.exit(1); //Quit because fatal error } return destination; }//END OF lookupDestination() private static ConnectionFactory theCF = null; //Used for all threads private Session jmsSession = null; private Connection jmsConnection = null; private boolean running = true; private boolean connected = false; private static String jndiLDAPserver = ""; private static String jndiCFname = ""; private static String jndiDestinationFile = ""; private static String jndiDestinationName = ""; private final static int DEFAULT_RECEIVE_TIMEOUT = 500; //Milliseconds to wait for message if queue is empty! private final static int DEFAULT_RECONNECT_PAUSE = 5000; //Milliseconds to wait before next reconnect-try private final static int DEFAULT_NUMBER_OF_THREADS = 1; //Milliseconds to wait before next reconnect-try `enter code here`private final static int MAX_MSGS = 5; private static int DONE_MSGS = 0; public static void main(String[] args) { System.out.println("starting application"); //reading config if (args.length < 4) { System.out.println("Arguments missing."); System.out.println("LDAP-server&context CF-name PathTo.bindings DestinationName"); System.out.println("To force re-enabling SSLv3 enable support the property JMS11Sample.reenableSSLv3 must be <> NULL (check setup files for details)"); System.exit(1); } jndiLDAPserver = args[0]; jndiCFname = args[1]; jndiDestinationFile = args[2]; jndiDestinationName = args[3]; if(System.getProperty("JMS11Sample.reenableSSLv3")!=null) { System.out.println("Force re-enabling SSLv3 support"); String disabledAlgorithms = Security.getProperty("jdk.tls.disabledAlgorithms"); Security.setProperty("jdk.tls.disabledAlgorithms", disabledAlgorithms .replace("SSLv3,", "")); } //Run the demo System.out.println("creating threads..."); Jms11ListenerSample[] myJMSthreads = new Jms11ListenerSample[DEFAULT_NUMBER_OF_THREADS]; for (int i=0; i<DEFAULT_NUMBER_OF_THREADS; i++) { myJMSthreads[i] = new Jms11ListenerSample(); myJMSthreads[i].start(); } System.out.println("joining threads..."); for (int i=0; i<DEFAULT_NUMBER_OF_THREADS; i++) { try {myJMSthreads[i].join(); } catch (InterruptedException e) { } } System.out.println("application ended normally"); } //END MAIN() //END OF CLASS
//This is what I am trying in spring Boot configuration -
@Configuration @EnableJms public class JmsConfig { @Value("${ibm.mq.queueManager}") private String queueManager; @Value("${ibm.mq.port}") private int port; @Value("${spring.jms.jndi-cf-name}") private String jndiCfName; @Value("${spring.jms.jndi-LDAP-Server}") private String jndiLdapServer; @Bean public ConnectionFactory getConnection(){ ConnectionFactory connFactory = null; InitialContext ctx = null; Hashtable jndiContext = new Hashtable(); jndiContext.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); jndiContext.put(Context.SECURITY_AUTHENTICATION, "none"); jndiContext.put(Context.PROVIDER_URL, jndiLdapServer); try { ctx = new InitialContext(jndiContext); connFactory = (ConnectionFactory) ctx.lookup(jndiCfName); if (connFactory == null) { System.out.println("FATAL ERROR: CF " + jndiCfName); System.exit(1); } ctx.close(); } catch (NamingException e) { e.printStackTrace(); System.exit(1); } catch(ClassCastException e) { e.printStackTrace(); System.exit(1); } return connFactory; } @Bean public JmsTemplate jmsTemplate(){ return new JmsTemplate(getConnection()); } }
-
Cannot construct instance of java.time.LocalDateTime - Jackson
I have two Spring Boot applications which communicate through JMS Messaging and ActiveMQ.
One app sends to the other app an object which contains a LocalDateTime property. This object is serialized to JSON in order to be sent to the other application.
The problem I'm facing is that Jackson is not able to deserialize the LocalDateTime property when it's trying to map the incoming json to my object. The LocalDateTime property has the following format when it arrives to the "listener app":
"lastSeen":{ "nano":0, "year":2019, "monthValue":4, "dayOfMonth":8, "hour":15, "minute":6, "second":0, "month":"APRIL", "dayOfWeek":"MONDAY", "dayOfYear":98, "chronology":{ "id":"ISO", "calendarType":"iso8601" } }
The exception I'm getting is the following:
org.springframework.jms.support.converter.MessageConversionException: Failed to convert JSON message content; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of java.time.LocalDateTime
I was able to fix this issue temporarily by using the following annotations:
@JsonSerialize(as = LocalDateTimeSerializer.class) @JsonDeserialize(using = LocalDateTimeDeserializer.class, as = LocalDateTime.class) private LocalDateTime lastSeen;
but they belong to jackson datatype jsr310 which is now DEPRECATED.
Is there any way/alternative to deserialize this LocalDateTime property without using the above annotations? Or how do I get this to work using the recommended jackson-modules-java8?
-
logstash Input painfully slow while fetching messages from activemq topic
I have configured JMS input in logstash to subscribe to JMS topic messages and push messages to elastic search.
input { jms { id => "my_first_jms" yaml_file => "D:\softwares\logstash-6.4.0\config\jms-amq.yml" yaml_section => "dev" use_jms_timestamp => true pub_sub => true destination => "mytopic" # threads => 50 } } filter { json{ source => "message" } } output { stdout { codec => json } elasticsearch { hosts => ['http://localhost:9401'] index => "jmsindex" } }
System specs:
RAM: 16 GB Type: 64 bit Processor: Intel i5-4570T CPU @ 2.9 GHz
This is extremely slow. Like 1 message every 3-4 minutes. How should I debug to figure out what is missing?
Note: Before this, I was doing this with @JMSListener in java and that could process 200-300 records per sec easily.
-
javax.jms.TemporaryTopic.delete() block indefinitely if broker is not available
I have configured ActiveMQ broker with failover. Any message which is sent with request mode that encounter timeout we delete temporary topic created to listen reply. Now same time my broker gets unavailable and while delete temporary topic thread gets block indefinitely at javax.jms.TemporaryTopic.delete(). Please help if anyone has experienced to avoid this.
-
JMSTemplate with multiple brokers. Destination resolving exception
I have problem which I am trying solve all day, without success... I have an application which trying to send/receive messages to/from external system A and external system B. A and B it is WLS based external systems.
While my application is coming up - i am reading all configurations and building my applicational JMSProducer and injecting JMSTemlate with predefined destination name in it.
Here is my code:
private JMSProducer initProducer(Conf conf) { DestinationResolver destinationResolver = getDestinationResolver(conf); ConnectionFactory connectionFactory = getConnectionFactory(); String destinationName = conf.getDestinationName(); JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(connectionFactory); jmsTemplate.setDestinationResolver(destinationResolver); jmsTemplate.setDefaultDestinationName(destinationName); return new JMSProducer(jmsTemplate); } public DestinationResolver getDestinationResolver(Conf conf) { JndiDestinationResolver destinationResolver = new JndiDestinationResolver(); destinationResolver.setCache(false); destinationResolver.setJndiTemplate(getJNDITemplate(conf)); return destinationResolver; } private JndiTemplate getJNDITemplate(Conf conf) { JndiTemplate jndiTemplate = new JndiTemplate(); Properties properties = new Properties(); String connectionFactoryClassName = externalSystemConf.getConnectionParam().getConnectionFactory(); properties.setProperty("java.naming.factory.initial", connectionFactoryClassName); properties.setProperty("java.naming.provider.url", getProviderURL(conf.getConnectionParam())); jndiTemplate.setEnvironment(properties); return jndiTemplate; }
Now scenario what happens.
My app is up and running, external system A with 2 queues and external system B with 1 queue also is up and running.
- I am retrieving relevant, already initialized JMSProducer in which I have already injected JMSTemplate with destinationName.
- Sending messages to queues of external system A
- Again retrieving next instance of JMSProducer relevant for system B
- Sending messages to queue of external system B
- At this stage everything is good, all messages delivered to relevant queues in external systems.
- Now I am getting again JMSProducer which relevant for external system A, and trying to send messages to one of the queues. And in this stage I have a problem, DestinationResolutionException is thrown:
Destination [topic2.queueName] not found in JNDI
javax.naming.NameNotFoundException: While trying to lookup 'topic2.queueName' didn't find subcontext 'topic2'. Resolved ""
How it is possible, I have just sent messages to external system A with the same destination and it worked fine. Why it throwing exception when I am sending message to A after I tried to sent it to B?
By the way, If I will try to change cache flag to true when defining destination resolver, it is solving this problem. However in this case I starting to have problem when my external system is going to be restarted. After restart it also have some exception related to destination resolving.
-
spring container jms latency
We are using th spring listener container with teh following configuration.
Though we have set a concurrency of 15,we notice that messages are not immediately handed over to waiting threads as soon as they finish processing the previous request.
Although there being a huge backlog of messages on the topic - and processing threads idle and available;the messages seem to be handed over to the waiting threads after a short lag - and seemingly in batches,rather than a continuous steady flow.
What may be causing this behavior?<jms:listener-container concurrency="15" acknowledge="auto" destination-resolver="appDestinationResolver" destination-type="queue" error-handler="jmsErrorHandler" connection-factory="appCachingConnectionFactory"> <jms:listener selector="${xxxx}='${xxxxvalue}' AND ${xxxxkey2}='${xxxxvalue2}'" destination="${app.jms.queue.in}" ref="appMsgListener" method="handleappResponse" id="app_Listener" /> </jms:listener-container>
-
How to fix message converter miss understanding object type for Jackson parsing
I setting up
MessageConverter
withMappingJackson2MessageConverter
forJmsTemplate
. Where do I expect that I can cast message to my object class but it does not happened!I write
MessageConverter
as follow that it used for myJmsTemplate
sending message method and listener factory (setMessageConverter
) as followprivate MessageConverter messageConverter(){ MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); messageConverter.setTargetType(MessageType.TEXT); messageConverter.setTypeIdPropertyName("_type"); return messageConverter; }
and my Object class is
public class RequestMessage<T> implements Serializable { String ticket; T detail; // getter , setter & toString }
but after consuming
RequestMessage<Foo>
with@JmsListener
it can not castFoo
becauseJackson
change it toLinkedHashMap
.I will appropriated any hint or answers.
-
MQRC_UNKNOWN_ALIAS_BASE_Q when connecting with IBM MQ cluster using CCDT and Spring Boot JMSTemplate
I have a Spring Boot app using JMSListener + IBMConnectionFactory + CCDT for connecting an IBM MQ Cluster.
A set the following connection properties: - url pointing to a generated ccdt file - username (password not required, since test environment) - queuemanager name is NOT defined - since it's the cluster's task to decide, and a few google results, including several stackoverflow ones indicate that in my case qmgr must be set to empty string.
When my Spring Boot JMSListener tries to connect to the queue, the following MQRC_UNKNOWN_ALIAS_BASE_Q error occurs:
2019-01-29 11:05:00.329 WARN [thread:DefaultMessageListenerContainer-44][class:org.springframework.jms.listener.DefaultMessageListenerContainer:892] - Setup of JMS message listener invoker failed for destination 'MY.Q.ALIAS' - trying to recover. Cause: JMSWMQ2008: Failed to open MQ queue 'MY.Q.ALIAS'.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2082' ('MQRC_UNKNOWN_ALIAS_BASE_Q'). com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'MY.Q.ALIAS'. at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:513) at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
In the MQ error log I see the following:
01/29/2019 03:08:05 PM - Process(27185.478) User(mqm) Program(amqrmppa) Host(myhost) Installation(Installation1) VRMF(9.0.0.5) QMgr(MyQMGR) AMQ9999: Channel 'MyCHL' to host 'MyIP' ended abnormally. EXPLANATION: The channel program running under process ID 27185 for channel 'MyCHL' ended abnormally. The host name is 'MyIP'; in some cases the host name cannot be determined and so is shown as '????'. ACTION: Look at previous error messages for the channel program in the error logs to determine the cause of the failure. Note that this message can be excluded completely or suppressed by tuning the "ExcludeMessage" or "SuppressMessage" attributes under the "QMErrorLog" stanza in qm.ini. Further information can be found in the System Administration Guide. ----- amqrmrsa.c : 938 -------------------------------------------------------- 01/29/2019 03:15:14 PM - Process(27185.498) User(mqm) Program(amqrmppa) Host(myhost) Installation(Installation1) VRMF(9.0.0.5) QMgr(MyQMGR) AMQ9209: Connection to host 'MyIP' for channel 'MyCHL' closed. EXPLANATION: An error occurred receiving data from 'MyIP' over TCP/IP. The connection to the remote host has unexpectedly terminated. The channel name is 'MyCHL'; in some cases it cannot be determined and so is shown as '????'. ACTION: Tell the systems administrator.
Since the MQ error log contains QMgr(MyQMGR), which MyQMGR value I did not set in the connection properties, I assume the routing seems to be fine: the MQ Cluster figured out a qmgr to use.
The alias exists and points to an existing q. Bot the target q and the alias are added to the cluster via the CLUSTER(clustname) command.
What can be wrong?