Client for RabbitMQ listner

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.amqp.Amqp; import org.springframework.integration.dsl.support.Transformers; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import com.lh.digital.integration.framework.rabbit.Utils; /** * * @author RAVI VARMA YARAkARAJU */ @Configuration public class RabbitInboundFlow { private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class); @Autowired private RabbitConfig rabbitConfig; @Autowired private ConnectionFactory connectionFactory; @Bean DirectExchange sampleExchange() { return new DirectExchange("horizon.exchange", true, false); } @Bean Binding sampleBinding(DirectExchange sampleExchange, Queue sampleQueue) { return BindingBuilder.bind(rabbitConfig.sampleQueue()).to(sampleExchange).with("event.key"); } @Bean @Primary public RabbitTemplate rabbitTemplate() { System.out.println(" The connection factory value :" + connectionFactory); RabbitTemplate r = new RabbitTemplate(connectionFactory); r.setExchange(sampleExchange().getName()); r.setChannelTransacted(true); r.setRoutingKey("event.key"); return r; } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(this.connectionFactory); listenerContainer.setQueues(this.rabbitConfig.sampleQueue()); listenerContainer.setConcurrentConsumers(1); listenerContainer.setExclusive(true); return listenerContainer; } @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer())) .transform(Transformers.objectToString()) .handle((message) -> { logger.info("Received {}", message.getPayload()); Utils.sleep(3000); Utils.throwExceptionsPercentOfTime(40); logger.info("Processed {}", message.getPayload()); }, c -> c.advice(this.retryAdvice())) .get(); } @Bean public RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate()); return retryAdvice; } @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); retryPolicy.setMaxAttempts(3); retryTemplate.setRetryPolicy(retryPolicy); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(backOffPolicy); return retryTemplate; } } ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean public Queue sampleQueue() { return new Queue("horizon.queue", true, false, false); } } +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Process messages serially with RabbitMQ with a cluster of listeners processing the messages, is to use a flag on a listener with 1 thread on each listener processing the messages.

Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially.

Be the first to comment

You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.