Java: Get Messaging with Spring AMQP and RabbitMQ

In this article, you create a RabbitMQ topic exchange and queue, and a Spring AMQP sender and receiver that message using Advanced Message Queuing Protocol (AMQP) and log to the console with Simple Logging Facade for Java (SLF4J).

Prerequisites:

  • Docker

Steps:

  • Create a new Spring Project Directory

Create a new Spring Project Directory

$ mkdir spring-amqp
$ cd spring-amqp

Run RabbitMQ with Docker

To run RabbitMQ with Docker, pull the ‘rabbitmq’ image and run the container with the management console. Then check that RabbitMQ is running.

$ docker pull rabbitmq
$ docker run -d --restart always --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
$ docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES6b4ddf858402 rabbitmq:3-management "docker-entrypoint.s…" 9 minutes ago Up 9 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq

If RabbitMQ is running, you can open the management console in a browser at http://localhost:15672, login with guest/guest. Then, enable tracing and turn tracing on.

$ docker exec -ti rabbitmq sh -c "rabbitmq-plugins enable rabbitmq_tracing"
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
rabbitmq_tracing
started 1 plugins.
$ docker exec -ti rabbitmq sh -c "rabbitmqctl trace_on"
Starting tracing for vhost "/" ...
Trace enabled for vhost /

Create a trace via Admin > Tracing > Add a new trace, with name ‘trace1’ and click ‘Add trace’.

Create a Topic Exchange, Queues and Bindings

You can create a RabbitMQ exchange, and queues in the exchange, manually via the management console, but you can also create a ‘rabbit.definitions.json’ file and import this definitions file.

$ vi rabbit.definitions.json

Define the following exchange, queues, and binding of queues to exchange. The bindings forward messages sent to the exchange to the queue, using the ‘routing_key’ property.

{
"rabbit_version":"3.7.4",
"vhosts":[
{
"name":"/"
}
],
"queues":[
{
"name":"q1",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{
}
},
{
"name":"q2",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{
}
}
],
"exchanges":[
{
"name":"spring-amqp-exchange",
"vhost":"/",
"type":"topic",
"durable":true,
"auto_delete":false,
"internal":false,
"arguments":{
}
}
],
"bindings":[
{
"source":"spring-amqp-exchange",
"vhost":"/",
"destination":"q1",
"destination_type":"queue",
"routing_key":"q1",
"arguments":{
}
},
{
"source":"spring-amqp-exchange",
"vhost":"/",
"destination":"q2",
"destination_type":"queue",
"routing_key":"q2",
"arguments":{
}
}
]
}

In the RabbitMQ management console, go to Overview > Import definitions > Choose file… > select the ‘rabbit.definitions.json’ file > Open > click ‘Upload broker definitions’.

Create a Spring Application

Create the Spring directory structure.

$ mkdir -p src/main/java/hello
$ mkdir -p src/main/java/resources
$ mkdir -p src/test/java/hello

Create the Apache Maven ‘pom.xml’ file in the root directory of the project.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<groupId>org.springframework</groupId>
<artifactId>spring-amqp</artifactId>
<version>0.1.0</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.0.1.RELEASE</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.jayway.jsonpath/json-path -->
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-security -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.0.1.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>true</executable>
</configuration>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>2.21.0</version>
</plugin>
</plugins>
</reporting>
<repositories>
<repository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</pluginRepository>
</pluginRepositories>
</project>

Create a new main Spring application class ‘hello/Application.java’ in ‘src/main/java’. The Spring application is started from the main() method. The Spring AMQP Application creates the Queue, Exchange and the binding of the Exchange to the Queue by routing key. It then instantiates a simple message listener.

package hello;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class Application {
private static final Logger logger =
LoggerFactory.getLogger(Application.class);
static final String topicExchangeName = "spring-amqp-exchange";
static final String queueName = "q1";
static final String routingKey = "q1";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
@Bean
SimpleMessageListenerContainer container(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args)
throws InterruptedException {
SpringApplication.run(Application.class, args).close();
}
}

Create a Spring AMQP Sender

After completing the main() method in ‘hello/Application.java’, Spring runs the CommandLineRunner.

package hello;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Sender implements CommandLineRunner {
private static final Logger logger =
LoggerFactory.getLogger(Sender.class);
private final RabbitTemplate rabbitTemplate;
public Sender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
logger.debug("Sending message...");
rabbitTemplate.convertAndSend(Application.topicExchangeName, Application.routingKey, "Hello World!");
}
}

Create a Spring AMQP Receiver

The Receiver simply listens for messages. The Spring application has instantiated the receiver in the message listener.

package hello;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private static final Logger logger =
LoggerFactory.getLogger(Receiver.class);
public void receiveMessage(String message) {
logger.debug("Received <" + message + ">");
}
}

Configure the Logger

To configure the Logger to log messages to the console, create a new file ‘~/src/main/java/resources/application.yaml’.

logging:
level:
root: ERROR
org:
springframework: ERROR
hello: DEBUG
pattern:
console: '%d{yyyy-MMM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{15} - %msg%n'

Run the Spring Application

$ mvn clean package
$ java -jar target/spring-amqp-0.1.0.jar

Done! Don’t forget to add Docker, Apache Maven, Spring AMQP, AMQP, RabbitMQ with a topic exchange and SLF4J logging to your resume!

Cloud Native Developer Advocate @IBMDeveloper for Cloud Native, Containers, Kubernetes, Security and DevOps. Dutch NYer, dad, humanist with empathy for paradox.