Skip to content
Colin Wren
Twitter

Lessons learned testing a Apache Kafka based application with Jest & Node.js

Lessons Learned, JavaScript, Testing, Software Development7 min read

At work recently I’ve moved from building UI automation suites using Selenium (which I’m very comfortable doing) to building automated end-to-end tests for a Kafka & REST API based app, that is the backbone of the entire system and written as a bunch of Java services that act as an adapter between two services.

The set of Java services do the following job:

  • Take Avro serialised messages off a Kafka queue that is populated by an upstream 3rd party system that generates these messages on data being sent to it’s REST API
  • Convert the messages into another format
  • Send the converted message as unserialised JSON to another Kafka queue which will be consumed by another service that populates the data into a REST API used by the frontend
diagram of app I've been working on
Diagram of the type of app I’ve been testing

In order to end-to-end test the system I need to do the following:

  • Send a REST API call to the upstream system to trigger an action (such as create and update a full record and it’s associated entities)
  • Check the serialised message the upstream system is publishing to the Kafka queue the system under test will be consuming
  • Check the unserialised message the system under test publishes to the Kafka queue the downstream system will consumer
  • Check the REST API of the downstream system to ensure that it’s showing the correct state based on the input message

I also have one technical constraint. While the system under test is built in Java there’s an existing library used by my colleagues to work with both the upstream and downstream REST APIs which is written in JavaScript.

Oh and there’s one mental constraint too, I’ve never used or tested a distributed queue system and I’ve never worked with Avro serialisation.

Demo project

To better illustrate the type of system I’ve been testing I’ve built an example application and test suite on my Github: https://gitlab.com/colinfwren/testing-kafka-app-with-jest

The system is composed of three apps:

  • A producer app that takes an API call and publishes Avro serialised messages to a queue
  • An adapter app that takes the Avro serialised messages off the queue and publishes non-serialised JSON messages to another topic
  • A consumer app that takes the JSON messages and outputs the results in a REST API

The demo project contains a test suite to end-to-end test the system and a Makefile to run the entire stack within Docker Compose.

Connecting to Kafka

As I’m using the JavaScript library I decided to use Jest to run the tests. The main difficulty was finding a library to handle getting messages from Kafka and deserialising the Avro messages published by the upstream system.

After a bit of searching and experimenting I found kafka-node to be the better of the vanilla Kafka libraries available for Node. It’s relatively easy to set up and allows the offset on a topic & partition to be adjusted on the fly.

Offsets are really important when testing Kafka queues (especially when Avro schemas are involved) as if you don’t set one up your consumer will start reading from the start of the topic’s messages which could mean you have to process a weeks worth of messages before seeing the message you actually want to see.

I also found kafka-avro which has a really simple set up process, you just point it to the schema registry and once it receives a message the JSON object is available via data.parsed . I struggled a little with the offsets on this one as it only supports auto.offset.reset=latest and (as far as I know) there’s no means to adjust the offset on the fly.

Offsets for queues with Avro serialised messages are important, because if the schema used to serialise the messages changes within the retention period of the queue you could run into issues where you’re trying to deserialise messages with incompatible schemas.

Local development Gotchas

Two things that caught me out when I moved from developing locally to running the tests in the deployment infrastructure were:

  • Kafka partitions messages and each partition will have a different offset so if you’re assuming that the messages will only ever be on one partition during development (like I did) then you’re offsets will not work
  • Kafka has a security protocol which if you don’t include in your consumer config means you won’t be able to get any offsets as the consumer will not connect

Avro Schemas

While kafka-avro will get the schema from the schema registry for you it’s worth noting that it doesn’t not protect against the schema changing or if the schema is invalid.

One issue I came across was that the Java implementation of the Avro schema deserialiser would only read the schemas it needed to read and as such didn’t blow up when the schema had an error in it but the Node library avsc did.

One of the records in the schema set a default on a record to "{}" instead of {} which resulted the avsc.parse() function throwing an error.

Group IDs and Offsets

The offset for each partition is linked to the consumer by the group ID the consumer declares when connecting. If you are using auto.offset.reset and don’t seem to be getting only the latest messages it may be due to you re-using a group ID for a consumer that hasn’t been committing it’s offset back to Kafka.

I found it a good idea to use a UUID appended to the end of my consumer’s group ID to ensure each consumer always got the latest messages.

This also applied to the system under test. If there’s a means to configure the group ID and offset (such as via an environment variable) this will speed up the time it takes for the messages to go through all parts of the system.

Making assertions against a distributed queue

Probably the hardest part of testing the system I’m testing is that there’s no guarantee that the messages will arrive in a certain amount of time and depending on how the systems been built there could also be no guarantee of message order either.

To tackle this issue I re-used an approach commonly used in UI testing which is to wait for things before carrying out an action on them.

I accomplish this by polling on an array of messages received by the Kafka consumers until the message I need has been added to the array, with a timeout to ensure that the test fails if nothing ever gets received within a sensible timeframe.

1/**
2 * Sleep for the specified number of seconds
3 *
4 * @param {number} seconds - Number of seconds to sleep for
5 * @returns {Promise}
6 */
7const sleep = async (seconds) => {
8 return new Promise(resolve => {
9 setTimeout(resolve, (seconds * 1000));
10 });
11};
12
13/**
14 * Attempt to find the record with the defined value (needle) within the Array of records (haystack)
15 * using the filterFunction provided over the specified number of attempts
16 *
17 * @param {Object[]} haystack - Array of records
18 * @param {String|number} needle - Value used to identify the record
19 * @param {function} filterFunction - Function used to filter out the required record
20 * @param {number} attempts - Number of attempts to find the record
21 * @returns {Promise<*>}
22 */
23const eventualQueueMember = async (haystack, needle, filterFunction, attempts = 5) => {
24 const hits = filterFunction(haystack, needle);
25 if (hits.length > 0) {
26 return hits[0];
27 }
28 if (attempts === 1) {
29 throw new Error(`Failed to get ${needle} in queue`);
30 }
31 await sleep(1);
32 return eventualQueueMember(haystack, needle, filterFunction, attempts - 1);
33};
34
35/**
36 * Find the Patient's admit message that's sent from the Producer App
37 *
38 * @param {Object[]} messages - Array of message object
39 * @param {string} nhsNumber - NHS Number for the patient
40 * @returns {Object[]} - Array of filtered out admit messages for the patient
41 */
42const findOutboundAdmitMessage = (messages, nhsNumber) => {
43 return messages.filter((message) => {
44 const isAdmit = Object.prototype.hasOwnProperty.call(message.body, 'com.colinwren.Admit');
45 if (isAdmit && message.body['com.colinwren.Admit'].nhsNumber === nhsNumber) {
46 return true;
47 }
48 return false;
49 });
50};
51
52const producerAppAdmitMessage = await eventualQueueMember(
53 producerAppOutboundMessages, // This is the array populated by the consumer's message handling function
54 nhsNumber, // value to look for in array of messages
55 findOutboundAdmitMessage, // function used to filter out the messages
56 5 // number of attempts to make before failing
57);
A recursive function to poll an array of messages for a value

I also do this same pattern against the REST API of the downstream system. This way I can get the messages published to Kafka and the REST API output before asserting on the data.

async/await is a must have for this type of work as it makes the code really easy to follow. I’d dread to think of the amount of nesting going on if I had to do this with Promise chains (or god forbid callbacks).

Developing tests locally

Luckily for me the team are already containerising our services so this made setting up a local version of the stack really simple as I could use Docker Compose to build a stack consisting of:

  • A Kafka queue I would publish Avro serialised messages to for the system under test to consume, should I want to check anything without having to do a full end-to-end
  • The system under test configured via environment variables in my compose to point to my local Kafka instance or another instance elsewhere
  • A Kafka queue the system under test would publish the converted JSON messages to
  • A instance of the downstream system that would connect to the local Kafka queue the system under test is publishing to

One issue I encountered was that Kafka’s advertised listeners IP needs to be that of the host machine on its local network in order for Kafka to work properly.

I managed to sort this out by creating a Makefile that I used to bring up the stack which did a little bash command to get the host machine’s IP and pass that as an environment variable to the Docker Compose file.

1#!/usr/bin/make
2
3run:
4 HOST_IP=$(shell ifconfig en0 | grep inet | grep -v inet6 | awk '{print $$2}') docker-compose -f confluent.yml up -d
5
6# In your docker-compose file you can use something like:
7# version: '3'
8# services:
9# zookeeper:
10# image: confluentinc/cp-zookeeper:5.1.0
11# environment:
12# - ZOOKEEPER_CLIENT_PORT=2181
13# ports:
14# - 2181:2181
15#
16# kafka:
17# image: confluentinc/cp-kafka:5.1.0
18# environment:
19# - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
20# - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${HOST_IP}:9092
21# - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
22# depends_on:
23# - zookeeper
24# ports:
25# - 9092:9092
Using a sub-shell in Make to get the host’s IP and passing that to Docker Compose

Useful tools for working with Kafka

There are a few useful tools for working with Kafka that Confluent provide for free. These can be installed on a Mac via Homebrew using the following command brew install confluent-oss .

Tools for seeing what’s being published to a queue

If you need to see if certain messages made it onto the queue then you can use kafka-console-consumer and kafka-avro-console-consumer for this.

These tools will connect to the queue as a consumer and show the messages as they are published (unless you specify to play from the start). You can then pipe this into a file if you want to save them locally.

To consume non-serialised messages from a queue you can use the following command:

1kafka-console-consumer --topic [TOPIC TO CONSUME] --bootstrap-server [KAFKA HOST]

To consume serialised messages from a queue you can use the following command:

1kafka-avro-console-consumer --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --formatter io.confluent.kafka.formatter.AvroMessageFormatter --property print.key=true --property schema.registry.url=http://[SCHEMA REGISTRY HOST] --topic [TOPIC TO CONSUME] --bootstrap-server [KAFKA HOST]

Tools for publishing messages to a queue

If you need to add certain messages onto a queue in order for the consuming system to do something based on the payload you can use kafka-console-producer and kafka-avro-console-producer for this.

These tools will connect to the queue as a producer and give you a prompt to type or paste messages into. One thing to look out for is that you cannot recall previous messages like you would in Terminal so you’ll need to edit and paste the messages from somewhere else.

To produce non-serialised messages you can use the following command:

1kafka-console-producer --topic=[TOPIC TO PUBLISH TO] --broker-list[KAFKA HOST]

To produce serialised messages you can use this command:

1kafka-avro-console-producer --broker-list [KAFKA HOST] --topic [TOPIC TO PUBLISH TO] --property schema.registry.url=[SCHEMA REGISTRY HOST] --property value.schema=[SCHEMA FOR MESSAGES AS JSON]

Tools for playing back messages from one queue into another

Sometimes you’ll need to play messages from one queue into another queue to simulate usage of the system or simply to have a separate topic you can manipulate for testing something.

kakfa-mirror-maker is the tool you need for doing this, but it requires some set up to work. This involves creating config files for the consumer and producer within which the consumer can declare its group ID and offset so you can control how many messages to get.

1# Example contents of consumer.config
2bootstrap-servers=[KAFKA HOST]
3exclude.internal.topics=true
4group.id=mirror-maker
5auto.offset.reset=latest
6
7# Example contents of producer.config
8bootstrap-servers=[KAFKA HOST]
9acks=-1
10batch.size=100
11client.id=mirror_maker_producer
12retries=2147483647
13max.in.flight.requests.per.connection=1
14
15# Command to set off mirroring
16kafka-mirror-maker --consumer.config consumer.config --producer.config producer.config --whitelist="[TOPIC NAME TO MIRROR]"
Example usage of kafka-mirror-maker

Summary

While the learning curve was originally steep due to my lack of experience working with distributed queues, I’ve learned a lot and working with the technology has made me a stronger technical tester.

Being able to see what messages are being sent between the many different services in a system is a really useful way of integration and end-to-end testing the system as a whole and in isolation.

Confluent have done a great job in providing tools for working with Kafka and Avro which allows for quick onboarding and analysis when consuming from a 3rd party upstream system.

Working an asynchronous system has really made me think more about the types of tests I’ve written in the past where I’ve assumed things would ‘just happen’ and how I should have been writing my test code more defensively, especially when it comes to API calls.

I’ve created a demo project ( https://gitlab.com/colinfwren/testing-kafka-app-with-jest ) for checking out how to end-to-end test a system using kafka in the way the system I’m working with does.