Search Suggest

SpringBoot RabbitMQ Example


RabbitMQ is a widely used AMQP broker. In this article, we will learn how to integrate RabbitMQ with Spring Boot and develop a message producer and consumer example app with RabbitMQ and spring boot.  We will be building a simple notification system and we will be testing the app with CommandLineRunner. The producer will publish the message to the direct exchange with routing key and the consumer consumes this message asynchronously.

RabbitMQ is an AMQP (Advanced Message Queuing Protocol) broker and is different from JMS(Java Messaging Service). You can visit my previous articles for spring boot JMS integration here.

Installing RabbitMQ on Windows

Let us first start with RabbitMQ installation on our local system. Below are the steps.
  • First download and install Erlang depending upon Windows-32 or Windows-64 bit of your OS from the url https://www.erlang.org/downloads. The erlang version that I have is OTP 22.0
  • Next, download the windows installer from https://www.rabbitmq.com/install-windows.html and follow the window installment instruction.
  • Once the installation process is done, you can find the installation directory here C:\Program Files\RabbitMQ Server.
  • To enable the management console on Windows, you can traverse to the sbin directory and execute below command:

    C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.17\sbin>rabbitmq-plugins.bat enable rabbitmq_management


After this the RabbitMQ management console can be accessed at http://localhost:15672/ and the default username/password would be guest/guest.

Below are some of the useful RabbitMQ commands.

rabbitmqctl.bat stop
rabbitmqctl.bat status
rabbitmq-service.bat start
spring-boot-rabbitmq-example


RabbitMQ Essentials

RabbitMQ is a message broker that implements Advanced Message Queing Protocol(AMQP). It increases loose coupling and scalability.

In any messaging system, there are 3 components involved - Producer, Consumer, and Queue or Topic. The producer produces the messages in the Queue and the consumer consumes that message asynchronously from the queue or topic. But it is a little different in case of AMQP.

For a single exchange and queue, the process is very simple. The producer publishes a message to the exchange and the exchange sends the message to the queue and the consumer consumes the message from the queue.

With a complex system, we will have multiple queues and multiple consumers. In that case, the producer sends message to the exchange with a routing key and the exchange connects with the Queue only with binding key and then the messages are distributed to all the queues. 

There are different types of exchange.
  • Direct Exchange - It routes messages to a queue by matching routing key equal to binding key.
  • Fanout Exchange - It ignores the routing key and sends message to all the available queues.
  • Topic Exchange - It routes messages to multiple queues by a partial matching of a routing key. It uses patterns to match the routing and binding key.
  • Headers Exchange - It uses message header instead of routing key.
  • Default (Nameless) Exchange - It routes the message to queue name that exactly matches with the routing key.


Spring Boot RabbitMQ Project Setup

Head over to https://start.spring.io to download the sample spring boot project with spring-boot-starter-amqp artifact.

Below is the project structure.



Spring Boot RabbitMQ Configuration

There are a couple of beans that are required to configure in spring boot to integrate RabbitMQ with it.

Queue - There are two types of Queue - durable and non-durable. Durable queue survives a server restart. The binding() method binds these two together, defining the behavior that occurs when RabbitTemplate publishes to an exchange.

We are using TopicExchange here but Direct exchange can also be used and it depends on the requirement. Topic Exchange routes messages to multiple queues by a partial matching of a routing key. It uses patterns to match the routing and binding key whereas direct exchange routes messages to a queue by matching routing key equal to binding key.

The bean defined in the listenerAdapter() method is registered as a message listener in the container defined in container(). It will listen for messages on the "devglan.queue" queue. Because the RabbitMqListener class is a POJO, it needs to be wrapped in the MessageListenerAdapter, where you specify it to invoke listen().

We can also directly use the annotation @RabbitListener in the RabbitMqListener class.

By default Spring Boot uses org.springframework.amqp.support.converter.SimpleMessageConverter and serialize the object into byte[]. Hence, we have Jackson2JsonMessageConverter to send the message in a JSON format.

AMQPConfig.java
@Configuration
public class AMQPConfig {

@Autowired
private RabbitMQProperties rabbitMQProperties;

@Bean
Queue queue() {
return new Queue(rabbitMQProperties.getQueueName(), false);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(rabbitMQProperties.getExchangeName());
}

/*@Bean
DirectExchange exchange() {
return new DirectExchange(rabbitMQProperties().getExchangeName());
}*/

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(rabbitMQProperties.getRoutingKey());
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(rabbitMQProperties.getQueueName());
container.setMessageListener(listenerAdapter);
return container;
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}

@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
MessageListenerAdapter listenerAdapter(RabbitMqListener listener) {
return new MessageListenerAdapter(listener, "listen");
}

}

Below is our application.properties. Spring boot has default configuration of spring.rabbitmq.port as 5672 and spring.rabbitmq.host as localhost. Hence, no need to specify those configurations here.

rabbitmq.queueName=devglan.queue
rabbitmq.exchangeName=devglan-exchange
rabbitmq.routingKey=devglan.routingkey

RabbitMQ Message Producer

In the producer class, we have injected our RabbitTemplate bean that we defined in our config class. convertAndSend() method publishes the method to the exchange with specified routing key.

The message will be published in a JSON format as we have set the bean Jackson2JsonMessageConverter as message converter in our RabbitTemplate bean definition.

AMQPProducer.java
@Component
public class AMQPProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
RabbitMQProperties rabbitMQProperties;

public void sendMessage(Notification msg){
System.out.println("Send msg = " + msg.toString());
rabbitTemplate.convertAndSend(rabbitMQProperties.getExchangeName(), rabbitMQProperties.getRoutingKey(), msg);
}
}

Notification.java
public class Notification implements Serializable {

private String notificationType;
private String msg;

public Notification() {
}
}

RabbitMQ Message Consumer

We can also define a consumer by annotating the method with @RabbitListener:

@Component
public class RabbitMqListener {

//@RabbitListener(queues="${rabbitmq.queueName}")
public void listen(byte[] message) {
String msg = new String(message);
Notification not = new Gson().fromJson(msg, Notification.class);
System.out.println("Received a new notification...");
System.out.println(not.toString());
}
}

Spring Boot Configuration Properties

Below is the helper class to read our environment properties.

@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
public class RabbitMQProperties {

private String queueName;
private String exchangeName;
private String routingKey;

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public String getExchangeName() {
return exchangeName;
}

public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}

public String getRoutingKey() {
return routingKey;
}

public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
}

Testing Message Producer and Consumer

We have below CommandLineRunner implementation to test our asynchronous messaging system. This will invoke the sendMessage() of producer class. On running the SpringBootRabbitmqApplication.java we have below logs.



You can also use the management console to produce and consume the messages.


Conclusion

In this article, we learned about integrating RabbitMQ with Spring Boot and develop a message producer and consumer example app with RabbitMQ and spring boot.


References:


Đăng nhận xét