Hace un tiempo nos encontrábamos trabajando en un proyecto que disponía de un servicio web implementado con Spring Boot, que se conectaba a RabbitMQ para gestionar tareas en background.

Para quienes no conozcan estas tecnologías, Spring Boot es uno de los frameworks más utilizados hoy en día para crear aplicaciones web y RabbitMQ es de los brokers de mensajería más usados. Por estas razones es muy habitual encontrarnos proyectos que usan este stack tecnológico.

En este escenario nos surgió la necesidad de cubrir con tests de integración la interacción entre la aplicación Spring Boot y el broker de mensajería RabbitMQ

Hasta el momento, los tests de integración de la aplicación cubrían operaciones síncronas, como las consultas sobre base de datos. Pero en este caso el envío y recepción de mensajes entre la aplicación de Spring Boot y el broker de RabbitMQ eran asíncronos.

Pese a tratarse de un escenario habitual, no encontré mucha información sobre posibles ideas para testear esta integración. Así que por eso me he animado a escribir este post: para compartir con vosotros un posible enfoque a la hora de testear la integración entre Spring Boot y RabbitMQ.

Nuestro punto de partida

Nuestro escenario es más fácil de comprender a través del siguiente diagrama:

springboot-rabbitmq-diagram

Por un lado, tenemos nuestra aplicación Spring Boot, que se conecta al broker de RabbitMQ mediante un componente Publisher encargado de publicar en RabbitMQ mensajes con información relativa a una operación que se desea ejecutar en background.

Por otro lado, el componente Listener es el encargado de escuchar al broker de RabbitMQ para consumir los mensajes que aparecen en la cola. Este componente se encarga de transferir la información del mensaje a otro componente Service, que ejecutará las lógicas necesarias para llevar a cabo la operación en background. Además de Listener, también podría denominarse Consumer. Es posible que en otros escenarios similares pueda aparecer con esta nomenclatura.

Componentes a testear

En este post me voy a centrar únicamente en los componentes Publisher y Listener y en cómo testear sus integraciones con RabbitMQ.

Antes de plantear los tests, y para comprender mejor el contexto, comparto con vosotros el código de los componentes que vamos a testear.

Vamos a suponer que el sistema gestiona contenido relacionado con libros de texto y la operación que se va a realizar en background es la actualización de la edición de un libro concreto.

Publisher

@Component
open class UpdateBookEditionQueuePublisher(
        private val rabbitTemplate: RabbitTemplate
) : IUpdateBookEditionQueuePublisher {

    @Throws(EntityUpdateException::class)
    override fun publishMessage(bookTitle: String, newEdition: Int) {
        val message = UpdateBookEditionMessage(bookTitle, newEdition)
        try {
            rabbitTemplate.convertAndSend(
                    UPDATE_BOOK_EDITION_ROUTING_KEY,
                    message
            )
        } catch (e: AmqpException) {
            throw EntityUpdateException(e.message)
        }
    }
}

Listener

@Component
class UpdateBookEditionQueueListener(
        @Autowired private val booksService: IBooksService
) {

    @RabbitListener(
            id = "update-book-edition-queue-listener",
            queuesToDeclare = [
                Queue(
                        UPDATE_BOOK_EDITION_QUEUE_NAME
                )
            ]
    )
    @Throws(EntityUpdateException::class)
    fun onMessageReceived(@Payload message: UpdateBookEditionMessage) {
        booksService.updateBookEdition(
                message.bookTitle, 
                message.newEdition
        )
    }
}

 ¿Fácil no? ¡El código habla por sí solo!

Service

De este componente es suficiente con conocer únicamente el método updateBookEdition de su interfaz, que es el que se invoca desde nuestro Listener.

interface IBooksService {
    
    . . .
    
    @Throws(EntityUpdateException::class)
    fun updateBookEdition(bookTitle: String, newEdition: Int)

    . . .

}

Configuraciones y propiedades

Solo falta añadir la configuración de los componentes y las propiedades de aplicación necesarias para que Spring Boot funcione correctamente con RabbitMQ:

@Configuration
class RabbitMQConfiguration {

    @Autowired
    lateinit var rabbitTemplate: RabbitTemplate

    @Bean
    fun jackson2MessageConverter(
            objectMapper: ObjectMapper
    ) = Jackson2JsonMessageConverter(objectMapper)

    @Bean
    fun updateBookEditionQueuePublisher(): IUpdateBookEditionQueuePublisher 
            = UpdateBookEditionQueuePublisher(rabbitTemplate)

    @Bean
    fun updateBookEditionQueue() =
            Queue(UPDATE_BOOK_EDITION_QUEUE_NAME)
}

Como podemos observar nuestra configuración de RabbitMQ es muy sencilla. Esto es en gran parte debido a que, por la simplicidad del problema, usamos el Default Exchanger. Si queréis saber más acerca de los diferentes exchangers de RabbitMQ os animo a que os paséis por su documentación.

Por último, la configuración de nuestra aplicación (application.properties):

spring.rabbitmq.host=<your_rabbitmq_host>
spring.rabbitmq.port=<your_rabbitmq_port>
spring.rabbitmq.username=<your_rabbitmq_username>
spring.rabbitmq.password=<your_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_rabbitmq_virtual_host>

Y ahora sí, ¡vamos con los tests!

Para poder ejecutar tests de integración sobre RabbitMQ es necesario disponer de un entorno de test que cuente con un broker RabbitMQ homólogo al del entorno de producción. En nuestro caso, al disponer de nuestro broker de RabbitMQ dockerizado, fue bastante sencillo replicarlo en diferentes entornos.

Una vez disponemos de nuestro RabbitMQ de test, crearemos una configuración (application.properties) para el entorno de test:

spring.rabbitmq.host=<your_test_rabbitmq_host>
spring.rabbitmq.port=<your_test_rabbitmq_port>
spring.rabbitmq.username=<your_test_rabbitmq_username>
spring.rabbitmq.password=<your_test_rabbitmq_password>
spring.rabbitmq.virtual-host=<your_test_rabbitmq_virtual_host>

Test de Integración del Publisher

Ya tenemos todo listo para ponernos manos a la obra y plantear nuestro primer test de integración. Vamos a comenzar con el Publisher:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        
    }
}

Como podemos ver, queremos probar que nuestro componente Publisher es capaz de publicar un mensaje en la cola de RabbitMQ con éxito. Para ello lo primero que vamos a hacer es declarar el envío de un mensaje de prueba a la cola.

. . .

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    // Assert that the message is published in queue
}

. . .

Claro, pero una vez enviado el mensaje, ¿Cómo probamos que este ha sido encolado correctamente en la cola de mensajes de RabbitMQ? Lo primero que se nos ocurrió a nosotros fue algo así:

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
    sut.publishMessage(
            TestConstants.FAKE_TITLE, 
            TestConstants.FAKE_EDITION
    )
    assertTrue(isMessagePublishedInQueue())
}

private fun isMessagePublishedInQueue(): Boolean {
    val queueMessageCount = rabbitTemplate.execute {
        it.queueDeclare(
                UPDATE_BOOK_EDITION_QUEUE_NAME,
                true,
                false, 
                false, 
                null
        )
    }.messageCount

    val queuedMessage = rabbitTemplate
            .receiveAndConvert(
                    UPDATE_BOOK_EDITION_QUEUE_NAME
            ) as UpdateBookEditionMessage

    return queueMessageCount == 1 
            && queuedMessage.bookTitle == TestConstants.FAKE_TITLE 
            && queuedMessage.newEdition == TestConstants.FAKE_EDITION
}

. . .

A simple vista podríamos decir que este planteamiento tiene sentido. En primer lugar y para asegurarnos de que la cola de mensajes esté vacía antes de ejecutar el test, la purgamos tanto antes de ejecutarlo como después de hacerlo. Para llevar a cabo esta operación usamos RabbitAdmin.

Estamos usando RabbitTemplate para poder consultar la cola de mensajes, obteniendo el número de mensajes encolados y el primer mensaje encolado de la misma. Esta información es suficiente para determinar que solo se ha recibido un mensaje y es el mismo que hemos enviado en el test.

Sin embargo tendremos suerte si este test aparece en verde, y, si lo hace, será en contadas ocasiones, ya que hay dos puntos que no estamos teniendo en cuenta:

  1. El componente Listener está activo desde el momento en que la aplicación Spring Boot de test esté corriendo, de modo que nos exponemos a que el Listener trate de consumir el mensaje una vez este se encole.
  2. El encolado se produce de forma asíncrona. Es posible que para cuando consultemos la cola de mensajes nuestro mensaje no haya sido encolado aún.

El problema de que el listener esté activo continuamente es bastante fácil de resolver, ya que basta con anular la activación automática de los listeners durante el arranque de Spring Boot a través de la siguiente línea en nuestro fichero application.properties de test:

spring.rabbitmq.listener.simple.auto-startup=false

Para resolver el problema del encolamiento asíncrono hicimos uso de una librería muy útil para plantear tests sobre operaciones asíncronas, Awaitility. Se trata de un DSL (Domain-specific language) que permite escribir expectativas sobre un sistema asíncrono de una forma concisa y fácil de leer.

Usando Awaitility, nuestro test finalmente quedó de la siguiente forma:

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueuePublisherIntegrationTest {

    @Autowired
    private lateinit var sut: IUpdateBookEditionQueuePublisher

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @Test
    fun testPublishMessageForHappyPathThenMessagePublishedInQueue() {
        sut.publishMessage(
                TestConstants.FAKE_TITLE, 
                TestConstants.FAKE_EDITION
        )
        await().atMost(30, TimeUnit.SECONDS)
                .until(isMessagePublishedInQueue(), `is`(true))
    }

    private fun isMessagePublishedInQueue(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            val queuedMessage = rabbitTemplate
                    .receiveAndConvert(
                            UPDATE_BOOK_EDITION_QUEUE_NAME
                    ) as UpdateBookEditionMessage

            queueMessageCount == 1
                    && queuedMessage.bookTitle == TestConstants.FAKE_TITLE
                    && queuedMessage.newEdition == TestConstants.FAKE_EDITION
        }
    }
}

Como se puede ver, finalmente creamos una aserción con Awaitility mediante la cual, durante un máximo de 30 segundos, se comprueba si el mensaje ha sido publicado con éxito. Superado este tiempo, el test fallaría.

Para este fin hemos tenido que modificar un poco nuestro método isMessagePublishedInQueue, haciendo que retorne un objeto de tipo Callable<Boolean>, necesario para la aserción de Awaitility.

Test de Integración del Listener

Una vez felices con nuestro test de integración para el componente Publisher, plantear un test de integración el componente Listener fue bastante sencillo, ya que disponíamos de las herramientas necesarias para hacerlo. 

¡Vamos a ver cómo!

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
        
    }
}

En este test queremos probar que nuestro componente Listener es capaz de consumir los mensajes que se encolan en la cola de RabbitMQ y que la información de estos mensajes se envía al Service correspondiente.

Para poder probar que nuestro Listener es capaz de consumir los mensajes, primero debemos enviar un mensaje de prueba.

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    // TODO
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}

. . .

Como podemos observar, al igual que hicimos con en el test de integración del Publisher, purgamos la cola con RabbitAdmin tanto antes como después de ejecutar el test para asegurarnos de que la cola de mensajes esté vacía antes de cada ejecución del test.

A continuación enviamos el mensaje de prueba con RabbitTemplate.

El siguiente paso es activar el componente Listener. No olvidemos que el auto arranque de este componente está desactivado mediante la propiedad que añadimos anteriormente en nuestro fichero application properties de test.

. . .

@Autowired
private lateinit var rabbitTemplate: RabbitTemplate

@Autowired
private lateinit var rabbitAdmin: RabbitAdmin

@Autowired
private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

@MockBean
private lateinit var booksServiceDouble: IBooksService

@Before
fun setUp() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
}

@After
fun tearDown() {
    rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    rabbitListenerEndpointRegistry.stop()
}

@Test
fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled() {
    sendTestMessageToQueue()
    startRabbitListener()
    Mockito.verify(booksServiceDouble, Mockito.times(1))
            .updateBookEdition(
                    TestConstants.FAKE_TITLE,
                    TestConstants.FAKE_EDITION
            )
}

private fun sendTestMessageToQueue() {
    rabbitTemplate.convertAndSend(
            UPDATE_BOOK_EDITION_ROUTING_KEY,
            UpdateBookEditionMessage(
                    TestConstants.FAKE_TITLE, 
                    TestConstants.FAKE_EDITION
            )
    )
}
    
private fun startRabbitListener() {
    rabbitListenerEndpointRegistry.getListenerContainer(
            "update-book-edition-queue-listener"
    ).start()
}

. . .

Para activar el componente Listener, hacemos uso de RabbitListenerEndpointRegistry. Usamos el método getListenerContainer para obtener el Listener concreto que estamos probando, a través de un identificador que pasamos por parámetro. Es importante tener en cuenta que el identificador que especificamos debe coincidir con el que se aparece dentro de la anotación @RabbitListener en la clase de nuestro componente Listener.

Una vez el Listener arranca, éste comprueba si hay mensajes en la cola y, en caso afirmativo, procede a consumirlos. Para validar que nuestro Listener funciona correctamente comprobamos que recibe el mensaje de prueba y, a continuación, llama al Service correspondiente pasando la información de dicho mensaje.

Una vez ejecutado el test volvemos a desactivar el listener a través del RabbitListenerEndpointRegistry en el método tearDown, para mantener la idempotencia en las ejecuciones de los tests, al igual que hacemos al purgar la cola.

Parece que todo tiene sentido, pero nuevamente necesitamos recurrir a nuestro amigo Awaitility, para evitar problemas relacionados con los diferentes eventos asíncronos que ocurren en la cola de mensajes.

Por último, también comprobamos que la cola queda vacía una vez consumido el mensaje.

@RunWith(SpringRunner::class)
@SpringBootTest
class UpdateBookEditionQueueListenerIntegrationTest {

    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate

    @Autowired
    private lateinit var rabbitAdmin: RabbitAdmin

    @Autowired
    private lateinit var rabbitListenerEndpointRegistry: RabbitListenerEndpointRegistry

    @MockBean
    private lateinit var booksServiceDouble: IBooksService

    @Before
    fun setUp() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
    }

    @After
    fun tearDown() {
        rabbitAdmin.purgeQueue(UPDATE_BOOK_EDITION_QUEUE_NAME, true)
        rabbitListenerEndpointRegistry.stop()
    }

    @Test
    fun testOnMessageReceivedForHappyPathThenMessageConsumedAndServiceCalled {
        sendTestMessageToQueue()
        awaitForFilledQueue()
        startRabbitListener()
        awaitForEmptyQueue()
        Mockito.verify(booksServiceDouble, Mockito.times(1))
                .updateBookEdition(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
    }

    private fun sendTestMessageToQueue() {
        rabbitTemplate.convertAndSend(
                UPDATE_BOOK_EDITION_ROUTING_KEY,
                UpdateBookEditionMessage(
                        TestConstants.FAKE_TITLE,
                        TestConstants.FAKE_EDITION
                )
        )
    }

    private fun awaitForFilledQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(false))
    }

    private fun startRabbitListener() {
        rabbitListenerEndpointRegistry.getListenerContainer(
                "update-book-edition-queue-listener"
        ).start()
    }

    private fun awaitForEmptyQueue() {
        await().atMost(30, TimeUnit.SECONDS)
                .until(isQueueEmpty(), CoreMatchers.`is`(true))
    }

    private fun isQueueEmpty(): Callable<Boolean> {
        return Callable {
            val queueMessageCount = rabbitTemplate.execute {
                it.queueDeclare(
                        UPDATE_BOOK_EDITION_QUEUE_NAME,
                        true,
                        false,
                        false,
                        null)
            }.messageCount

            queueMessageCount == 0
        }
    }
}

¡Y eso es todo! Así es como quedó nuestro test de integración para el componente Listener.

Conclusión

En este post he compartido con vosotros nuestra experiencia escribiendo tests de integración para RabbitMQ y Spring Boot. Espero que os haya resultado interesante el uso de la librería Awaitility para plantear tests en escenarios de asincronía.

Seguro que existen otras formas de resolver este problema o mejorar esta solución, por lo que no dudéis en compartir vuestras experiencias en la sección de comentarios.

Además, ¡estaré encantado de resolver cualquier duda que podáis tener!