import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import com.lh.digital.integration.framework.rabbit.Utils;
@Configuration
public class RabbitInboundFlow {
private static final Logger logger = LoggerFactory.getLogger(RabbitInboundFlow.class);
@Autowired
private RabbitConfig rabbitConfig;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
DirectExchange sampleExchange() {
return new DirectExchange("horizon.exchange", true, false);
}
@Bean
Binding sampleBinding(DirectExchange sampleExchange, Queue sampleQueue) {
return BindingBuilder.bind(rabbitConfig.sampleQueue()).to(sampleExchange).with("event.key");
}
@Bean
@Primary
public RabbitTemplate rabbitTemplate() {
System.out.println(" The connection factory value :" + connectionFactory);
RabbitTemplate r = new RabbitTemplate(connectionFactory);
r.setExchange(sampleExchange().getName());
r.setChannelTransacted(true);
r.setRoutingKey("event.key");
return r;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(this.connectionFactory);
listenerContainer.setQueues(this.rabbitConfig.sampleQueue());
listenerContainer.setConcurrentConsumers(1);
listenerContainer.setExclusive(true);
return listenerContainer;
}
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer()))
.transform(Transformers.objectToString())
.handle((message) -> {
logger.info("Received {}", message.getPayload());
Utils.sleep(3000);
Utils.throwExceptionsPercentOfTime(40);
logger.info("Processed {}", message.getPayload());
}, c -> c.advice(this.retryAdvice()))
.get();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
retryAdvice.setRetryTemplate(retryTemplate());
return retryAdvice;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Bean
public Queue sampleQueue() {
return new Queue("horizon.queue", true, false, false);
}
}
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Process messages serially with RabbitMQ with a cluster of listeners processing the messages, is to use a flag on a listener with 1 thread on each listener processing the messages.
Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially.
Exclusive consumer flag ensures that only 1 consumer can read messages from the specific queue, and 1 thread on that consumer ensures that the messages are processed serially.
Be the first to comment
You can use [html][/html], [css][/css], [php][/php] and more to embed the code. Urls are automatically hyperlinked. Line breaks and paragraphs are automatically generated.