A while ago we were working on a project that had a web service implemented with Spring Boot, which connected to RabbitMQ to manage background tasks.

If you haven’t ever heard about these technologies, Spring Boot is one of the most used frameworks today to create web applications and RabbitMQ is one of the most used messaging brokers. So it is very usual to find projects using this tech stack.

Under this scenario we needed to cover with integration tests the interaction between the Spring Boot application and the RabbitMQ message broker.

Until then, the operations covered by the application’s integration tests were synchronous, such as the queries in the database. But in this case sending and receiving of messages between the Spring Boot application and the RabbitMQ broker were asynchronous.

Despite it is a common scenario, I couldn’t find much information about possible ideas to test this integration. So that is why I decided to write this post: to share with you a possible approach for testing the integration between Spring Boot and RabbitMQ.

Our starting point

Our scenario is easier to understand through the following diagram:

On the one hand, we have our Spring Boot application, which connects to the RabbitMQ broker through a Publisher component in charge of publishing messages in RabbitMQ with information regarding an operation to be executed in background.

On the other hand, the Listener component is in charge of listening to the RabbitMQ broker in order to consume the messages that appear in the queue. This component is responsible for transferring the message information to a Service component, which will execute the necessary logics to carry out the background operation. In addition to Listener, it could also be called Consumer. It is possible that in other similar scenarios it may appear with this nomenclature.

Components to test

In this post I am going to focus only on the Publisher and Listener components and on how to test their integrations with RabbitMQ.

Before writing the tests, and to better understand the context, I share with you the code of the components that we are going to test.

Let’s suppose that the system manages content related to textbooks and the background operation to be performed is about updating the edition of a specific book.

Publisher

@Component
open class UpdateBookEditionQueuePublisher(
        private val rabbitTemplate: RabbitTemplate
) : IUpdateBookEditionQueuePublisher {

    @Throws(EntityUpdateException::class)
    override fun publishMessage(bookTitle: String, newEdition: Int) {
        val message = UpdateBookEditionMessage(bookTitle, newEdition)
        try {
            rabbitTemplate.convertAndSend(
                    UPDATE_BOOK_EDITION_ROUTING_KEY,
                    message
            )
        } catch (e: AmqpException) {
            throw EntityUpdateException(e.message)
        }
    }
}

Listener

@Component
class UpdateBookEditionQueueListener(
        @Autowired private val booksService: IBooksService
) {

    @RabbitListener(
            id = "update-book-edition-queue-listener",
            queuesToDeclare = [
                Queue(
                        UPDATE_BOOK_EDITION_QUEUE_NAME
                )
            ]
    )
    @Throws(EntityUpdateException::class)
    fun onMessageReceived(@Payload message: UpdateBookEditionMessage) {
        booksService.updateBookEdition(
                message.bookTitle, 
                message.newEdition
        )
    }
}

Easy, isn’t it? The code speaks for itself!

Service

About this component is enough to know just the updateBookEdition method of its interface, which is the one invoked from our Listener.

interface IBooksService {
    
    . . .
    
    @Throws(EntityUpdateException::class)
    fun updateBookEdition(bookTitle: String, newEdition: Int)

    . . .

}

Configurations and properties

The only thing missing is to add the configurations of the components and the application properties for Spring Boot to correctly work with RabbitMQ:

@Configuration
class RabbitMQConfiguration {

    @Autowired
    lateinit var rabbitTemplate: RabbitTemplate

    @Bean
    fun jackson2MessageConverter(
            objectMapper: ObjectMapper
    ) = Jackson2JsonMessageConverter(objectMapper)

    @Bean
    fun updateBookEditionQueuePublisher(): IUpdateBookEditionQueuePublisher 
            = UpdateBookEditionQueuePublisher(rabbitTemplate)

    @Bean
    fun updateBookEditionQueue() =
            Queue(UPDATE_BOOK_EDITION_QUEUE_NAME)
}

As we can see, our RabbitMQ configuration is very simple. This is mainly because, due to the simplicity of the problem, we use the Default Exchanger. If you want to know more about the different RabbitMQ exchangers, I suggest you to checkout the documentation.

Finally, the configuration of our application (application.properties):

spring.rabbitmq.host=<your_rabbitmq_host>
spring.rabbitmq.port=<your_rabbitmq_port>
spring.rabbitmq.username=<your_rabbitmq_username>
spring.rabbitmq.password=<your_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_rabbitmq_virtual_host>

Let’s go now with the tests!

In order to run integration tests on RabbitMQ, it is necessary to have a test environment with a RabbitMQ broker homologous to the one in the production environment. In our case, by having our RabbitMQ broker dockerized, it was quite easy to replicate it in different environments.

Once we have our test RabbitMQ, we will create a configuration (application.properties) for the test environment:

spring.rabbitmq.host=<your_test_rabbitmq_host>
spring.rabbitmq.port=<your_test_rabbitmq_port>
spring.rabbitmq.username=<your_test_rabbitmq_username>
spring.rabbitmq.password=<your_test_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_test_rabbitmq_virtual_host>

Publisher Integration Test

Now we have everything ready to get down to work and write our first integration test. Let’s start with the Publisher:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        
    }
}

As we can see, we want to test that our Publisher component is capable of successfully publishing a message on the RabbitMQ queue. For this purpose, the first thing we are going to do is declare the sending of a test message to the queue.

. . .

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    // Assert that the message is published in queue
}

. . .

Alright, but once the message is sent, how do we check that it has been correctly queued in the RabbitMQ message queue? The first thing that came to our minds was something like this:

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    assertTrue(isMessagePublishedInQueue())
}

private fun isMessagePublishedInQueue(): Boolean {
    val queueMessageCount = rabbitTemplate.execute {
        it.queueDeclare(
                UPDATE_BOOK_EDITION_QUEUE_NAME,
                true,
                false, 
                false, 
                null
        )
    }.messageCount

    val queuedMessage = rabbitTemplate
            .receiveAndConvert(
                    UPDATE_BOOK_EDITION_QUEUE_NAME
            ) as UpdateBookEditionMessage

    return queueMessageCount == 1 
            && queuedMessage.bookTitle == TestConstants.FAKE_TITLE 
            && queuedMessage.newEdition == TestConstants.FAKE_EDITION
}

. . .

At first sight we could say that this approach makes sense. First, and to make sure that the message queue is empty before running the test, we purge it both before and after running it. To carry out this operation we use RabbitAdmin.

We are using RabbitTemplate to consult the message queue, obtaining its number of queued messages and the first queued one. This information is enough to determine that only one message has been received and it is the one we sent in the test.

However, we will be lucky if this test appears in green, and if it does, it will be occasionally, since there are two points that we are not taking into account:

  1. The Listener component is active since the test Spring Boot application starts running, so the Listener will try to consume the message once it is queued.
  2. Queuing is an asynchronous operation. It is possible that, by the time we consult the message queue, our message has not been queued yet.

The problem that the listener is continuously active is quite easy to solve, since it is enough to disable the automatic activation of the listeners during the startup of Spring Boot through the following line in our test application.properties file:

spring.rabbitmq.listener.simple.auto-startup=false

To solve the problem of asynchronous queuing, we used a very useful library to write tests related with asynchronous operations, Awaitility. It is a DSL (Domain-Specific Language) that allows you to write expectations on an asynchronous system in a concise and easy to read way.

Using Awaitility, our test finally ended up as follows:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        sut.publishMessage(
                TestConstants.FAKE_TITLE, 
                TestConstants.FAKE_EDITION
        )
        await().atMost(30, TimeUnit.SECONDS)
                .until(isMessagePublishedInQueue(), `is`(true))
    }

    private fun isMessagePublishedInQueue(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            val queuedMessage = rabbitTemplate
                    .receiveAndConvert(
                            UPDATE_BOOK_EDITION_QUEUE_NAME
                    ) as UpdateBookEditionMessage

            queueMessageCount == 1
                    && queuedMessage.bookTitle == TestConstants.FAKE_TITLE
                    && queuedMessage.newEdition == TestConstants.FAKE_EDITION
        }
    }
}

As we can see, we finally created an assertion with Awaitility whereby, for a maximum time of 30 seconds, we are checking if the message has been successfully published. Consumed the time, the test would fail.

For this purpose we have had to modify a bit our isMessagePublishedInQueue method, making it return an object of type Callable<Boolean>, necessary for the Awaitility assertion.

Listener Integration Test

Once we were happy with our integration test for the Publisher component, writing an integration test for the Listener component was quite simple, since we had all the necessary tools to do it.

Let’s see how!

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
        
    }
}

We want to test that our Listener component is capable of consuming the queued messages from the RabbitMQ queue and that the information of this messages is sent to the corresponding Service.

In order to test that our Listener is capable of consuming the messages, first we must send a test message.

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    // TODO
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}

. . .

As we can see, just as we did in the Publisher integration test, we purge the queue with RabbitAdmin both before and after running the test to make sure that the message queue is empty before each test execution.

Then we send the test message with RabbitTemplate.

The next step is to activate the Listener component. Do not forget that the auto startup of this component is deactivated through the property that we previously added in our test application properties file.

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Autowired
private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

@MockBean
private lateinit var booksServiceDouble: IBooksService

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    rabbitListenerEndpointRegistry.stop()
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    startRabbitListener()
    Mockito.verify(booksServiceDouble, Mockito.times(1))
            .updateBookEdition(
                    TestConstants.FAKE_TITLE,
                    TestConstants.FAKE_EDITION
            )
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}
    
private fun startRabbitListener() {
    rabbitListenerEndpointRegistry.getListenerContainer(
            "update-book-edition-queue-listener"
    ).start()
}

. . .

To activate the Listener component, we use RabbitListenerEndpointRegistry. We use the getListenerContainer method to obtain the specific Listener we are testing, through an identifier that we pass as a parameter. It is important to note that the identifier we specify must be the same as the one that appears inside the @RabbitListener annotation in our Listener component class.

Once the Listener starts, it checks if there are messages in the queue and, if so, it proceeds to consume them. In order to validate that our Listener works correctly, we check that it receives the test message and calls the Service passing the consumed message’s information.

After the test has been run, we deactivate the listener again through the RabbitListenerEndpointRegistry in the tearDown method, in order to maintain idempotence in the test executions, just as we do when by purging the queue.

It seems that everything makes sense, but again we need to resort to our friend Awaitility, to avoid issues related to the different asynchronous events that occur in the message queue.

Finally, we check that the queue is empty once the message has been consumed.

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Autowired
    private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

    @MockBean
    private lateinit var booksServiceDouble: IBooksService

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
        rabbitListenerEndpointRegistry.stop()
    }

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled {
        sendTestMessageToQueue()
        awaitForFilledQueue()
        startRabbitListener()
        awaitForEmptyQueue()
        Mockito.verify(booksServiceDouble, Mockito.times(1))
                .updateBookEdition(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
    }

    private fun sendTestMessageToQueue() {
        rabbitTemplate.convertAndSend(
                UPDATE_BOOK_EDITION_ROUTING_KEY,
                UpdateBookEditionMessage(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
        )
    }

    private fun awaitForFilledQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(false))
    }

    private fun startRabbitListener() {
        rabbitListenerEndpointRegistry.getListenerContainer(
                "update-book-edition-queue-listener"
        ).start()
    }

    private fun awaitForEmptyQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(true))
    }

    private fun isQueueEmpty(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            queueMessageCount == 0
        }
    }
}

And that’s all! This is how our integration test for the Listener component ends up.

Conclusion

In this post I have shared with you our experience writing integration tests for RabbitMQ and Spring Boot. I hope you have found the usage of the Awaitility library for testing asynchronous scenarios interesting.

Surely there are other ways to solve this problem or improve this solution, so do not hesitate to share your experiences in the comments section.

Furthermore, I will be happy to answer any questions you may have!