Kafka to Redis with Kafka-Connect: Sending Redis Key Events -1

Yiğit İrez
11 min readOct 5, 2023

Let’s build a system where every Kafka message is transferred to Redis, with minor custom modifications, and we are able to be notified of key creation and expiry on the Redis side by any listening app we create. Sounds fun right?

This will be the first part of the process where we will focus on the point up to Redis. The next part will take care of processing the events and further modifying Redis keys accordingly.

now this sounds like fun. source

Goals:

  • Moving Kafka messages to Redis for some topics using kafka-connect
  • Transforming Redis key values via custom changes (instead of message value, we need the topic name as value).
  • Expose Redis key events to be notified of set and expire operations.

What We Need

  • Docker and docker-compose to run the compose file for various elements.
  • A Java IDE to create jars because nothing standard fits us and we have to make custom things.

Here is the system we aim to have at the end of this article.

the final flow

What happens here is that we have a bunch of topics that are fed from a script which pushes messages with the same body (since we will be overriding that), which are then captured by kafka-connect, sent to redis-sink to transform and sent on their way to Redis. Redis will then send events about what happens with the keys it holds to anyone listening.

You’ll need the following to be running locally or somewhere you have access to;

  • Kafka with two brokers (Why two specifically you might ask, why not?)
  • Redis
  • Zookeeper (since the brokers need to coordinate with each other for various reasons which are outside of the scope of this article. More info, here)
  • kafka-ui (to create topics, test messages and manage various options via a web user interface)
  • kafka-connect
  • kafka-rest (because we can’t be bothered to write 2 lines of code and want to send messages via rest using bash )

Have a look at the below compose which will run all of this for us. Feel free to change the versions of images. Create the below compose file in a folder and name the filedocker-compose.yml(surprise) and create two more dirs under this, named;

  • custom_transformer_code
  • kafka-connect-plugins

Three more blank files can be created under the root folder named;

  • kafka-message-generator.sh
  • redis.conf
  • redis-sink-config.json
version: '3'
services:
redis-cache:
image: redis:6.2-alpine
restart: always
ports:
- '6379:6379'
command: redis-server /etc/redis/redis.conf --save 20 1
volumes:
- redis-cache:/data
- ./redis.conf:/etc/redis/redis.conf
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka1:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka1
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_DELETE_TOPICS: "true"
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

kafka2:
image: confluentinc/cp-kafka:7.0.1
container_name: kafka2
depends_on:
- zookeeper
ports:
- "9093:9092"
environment:
KAFKA_BROKER_ID: 2
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_DELETE_TOPICS: "true"
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"

kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: Local Kafka
KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092
KAFKA_CLUSTERS_0_AUTO_OFFSET_RESET: earliest

kafka-connect:
image: confluentinc/cp-kafka-connect:7.0.1
container_name: kafka-connect
depends_on:
- kafka1
- kafka2
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
volumes:
- ./kafka-connect-plugins:/usr/share/confluent-hub-components
kafka-rest-proxy:
image: confluentinc/cp-kafka-rest:7.0.1
container_name: kafka-rest-proxy
depends_on:
- kafka1
- kafka2
ports:
- "8082:8082"
environment:
KAFKA_REST_BOOTSTRAP_SERVERS: "kafka1:9092,kafka2:9092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
volumes:
redis-cache:
driver: local

Let’s run this and go over what is happening, along with the configurations we need to make. Some things will fail at this point and that’s fine as long as they are not due to port conflicts. We can do a docker-compose up -d to get failing parts moving as we progress along the article.

Redis

If you notice the command part, it starts by using the redis.conf like so;

command: redis-server /etc/redis/redis.conf --save 20 1

The save part is to do with the persistance of Redis. -- save 20 1 means that Redis will save a snapshot of the dataset to disk if at least 1 key has changed and 20 seconds have passed since the last save operation.

If we don’t run Redis like this, it will ignore the redis.conf file we have and run with default settings, which are not good enough because we want key events so we can get notified about stuff happening to our kvs.

Redis Keyspace/Keyevent Notifications

Redis allows monitoring key-value changes via pub-sub channels using key event/space notifications. There are 2 types of notifications, namely keyspace notifications and key events. We want key events since Redis will send a notification like this onsetfor example;

"pmessage","__key*__:*","__keyevent@0__:set","this:will:be:yourkey"

To get set and expire operations via redis, we will need to set our redis.conflike below; You can omit loglevelwhich will revert logs to INFO level if you prefer.

notify-keyspace-events AE
loglevel debug

TheAEmeans, all key events will be sent as notification in key events format (redis docs).

For our purposes, we just needed key events for n (for new keys)and x (for expiry) and one would assume Enx would work but it does not, so we are using AE. You can test this out by entering your redis container and running redis-cli followed by these commands.

K     Keyspace events, published with __keyspace@<db>__ prefix.
E Keyevent events, published with __keyevent@<db>__ prefix.
g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$ String commands
l List commands
s Set commands
h Hash commands
z Sorted set commands
t Stream commands
d Module key type events
x Expired events (events generated every time a key expires)
e Evicted events (events generated when a key is evicted for maxmemory)
m Key miss events (events generated when a key that doesn't exist is accessed)
n New key events (Note: not included in the 'A' class)
A Alias for "g$lshztxed", so that the "AKE" string means all the events except "m" and "n".

You can test out different notifications via the same container’s redis-cli by running the below commands to set a different notification type and psubscribe'ing (subscribing with pattern) and from a different terminal or a redis-ui create/modify/delete a key. You should see an event popup.

config set notify-keyspace-events KEA
psubscribe '__key*__:*'

"pmessage","__key*__:*","__keyevent@0__:set","this:will:be:yourkey"

Don’t forget to set notifications to AE after done though.

Normaly, if you wanted to subscribe to keyspace events for redis db0 for set operations for example, you would use the first pattern. The second format subscribes to keyspace and keyevents along all Redis databases for all published events, which is overdoing it but serves our purposes.

__keyevent@0__:set
__key*__:*

Setting Up Kafka Connect

Once we run the compose file, kafka-ui and the kafka brokers should be running fine. We need to set a few things that kafka-connect uses so it plays nice with everything else.

Open kafka-ui from localhost:8080 and create the following topics with cleanup policy=compact and 1 partition. These topics are referenced in the compose file. kafka-connect theoretically auto creates these topics but it failed on my local due to auto-creation being off. It might also be beneficial to manually control these to adhere to company standards.

      CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
## needed to keep connector configuration
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
## needed to keep connector offset information
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
## needed to keep connector statuses

Create the following topics as well. These will be our data topics and every setting we will have, will reference these: my-data-topic,second-topic,third-topic,forth-topic. Feel free to change the names and don’t forget to modify the redis-sink-config.json if you do.

You should have this folder, in the same dir as your compose, by now. We will use this dir to place redis-sink transformer along with our custom transformer, which we will create in a moment.

./kafka-connect-plugins:/usr/share/confluent-hub-components

Redis-sink Setup

Let’s fill out the redis-sink-config.json file we had sitting around.

{
"name": "redis-sink-connector",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"topics": "my-data-topic,second-topic,third-topic,forth-topic",
"input.data.format": "STRING",
"redis.hosts": "redis-cache:6379",
"redis.db": "1",
"transforms": "topicToKeyTransformer",
"transforms.topicToKeyTransformer.type": "com.wowmything.kafka.connect.TopicToKeyTransformer",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}

We will be using the standard Redis Sink and do a manual installation, which will essentially be

  • Download zip linked in manual installation (also here),
  • Extract things under kafka-connect-plugins -> redis-sink-name.

If you noticed, we also have a custom transformer that we need which will replace the value of our new key with incoming kafka topic name, also mentioned in the config: “transforms.topicToKeyTransformer.type”: “com.wowmything.kafka.connect.TopicToKeyTransformer”,

We need to create a jar file to house our transformer though. Let’s create the code first, below;

package com.wowmything.kafka.connect;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.Transformation;

import java.util.Map;
import java.util.logging.Logger;

public class TopicToKeyTransformer<S extends ConnectRecord<S>> implements Transformation<S> {
private static final Logger logger = Logger.getLogger(TopicToKeyTransformer.class.getName());

@Override
public S apply(S record) {
// Extract the topic name from the record
String topic = record.topic();

logger.info("TopicToKeyTransformer Processing record from topic: " + topic);

// modify the record key because we can
String newKey = "THIS:IS:REDIS:" + record.key().toString();

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Schema.STRING_SCHEMA,
newKey,
record.valueSchema(),
topic,
record.timestamp());
/*
String topic,
Integer kafkaPartition,
Schema keySchema,
Object key,
Schema valueSchema,
Object value,
Long timestamp,
Headers headers
*/
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
// No resources to close
}

@Override
public void configure(Map<String, ?> configs) {
// No configuration needed, but the method must be implemented
}
}

What happens here is, kafka-connect throws the grabbed record to the redis-sink, which then gives it to our custom transformer before converting it to a Redis kv. We modify the key itself to have a prefix (which would otherwise be the exact key coming from the kafka message) and modify the value to contain the topic the message was coming from. Just before the final part where we return the record, I added the meaning of the fields that are expected, for reference.

We then package this to a jar. I’m using IntellijIDEA to package this and used below configuration. As long as the code get’s packaged to a jar, that’s good enough.

We then add our custom transformer jar to below location, similar to the redis-sink.

Now that’s done, let’s setup the redis-sink in kafka-connect.

We need to add the sink (connector of type sink for kafka-connect) which reference our redis-sink-config.json using the below command. If kafka-connect or redis is still down, running compose up again should get them working at this point. You need a running kafka-connect to be able to add connectors.

curl -X POST -H "Content-Type: application/json" - data @redis-sink-config.json http://localhost:8083/connectors

{
"name": "redis-sink-connector",
"config": {
"connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector",
"tasks.max": "1",
"topics": "my-data-topic,second-topic,third-topic,forth-topic",
"input.data.format": "STRING",
"redis.hosts": "redis-cache:6379",
"redis.db": "1",
"transforms": "topicToKeyTransformer",
"transforms.topicToKeyTransformer.type": "com.wowmything.kafka.connect.TopicToKeyTransformer",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"name": "redis-sink-connector"
},
"tasks": [],
"type": "sink"
}

We can use other commands to list, delete existing sinks (connectors) along with modifying root or individual connector log levels (posted as gist since medium hated this for some reason).

Testing

At this point, you should have things running and your topics ready. One component we will use in testing is the kafka-rest endpoint to send messages. You can test it out via the below command.

curl http://localhost:8082/v3/clusters

If that works fine, it means everything is ready for our script. Create a script named kafka-message-generator.sh and paste the following.

#!/bin/bash

KAFKA_REST_URL="http://localhost:8082"

# random UUID generation
generate_random_uuid() {
tr -dc 'a-f0-9' < /dev/urandom | head -c 32
}

# send messages to a Kafka topic
send_messages_to_topic() {
local topic="$1"
local keys=("${@:2}")

# Loop to send messages
for key in "${keys[@]}"; do
# Send message to the Kafka topic
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
-d "{\"records\": [{\"key\": \"$key\",\"value\": {\"type\": \"JSON\",\"data\": {\"sometypefield\": \"somevaluefield\"}}}]}" \
"$KAFKA_REST_URL/topics/$topic"
done
}

# topics to send messages to
topics=("my-data-topic" "second-topic" "third-topic" ) #"forth-topic"

# number of messages to send
total_messages=10

# random keys to reuse across topics
keys=()
for ((i=0; i<total_messages; i++)); do
keys+=("$(generate_random_uuid)")
done

# loop to send messages to different topics
for topic in "${topics[@]}"; do
echo "Sending messages to topic: $topic"

# Send messages using the same keys
send_messages_to_topic "$topic" "${keys[@]}"
done

echo "Finished sending messages."

This will create (total_messages) number of messages with keys being reused to send tomy-data-topic first, and then same key will be used to send a message to second-topic, and so on.

Let’s test this out.

skipping second topic since it will exactly be the same

At this point, we could monitor the key events Redis sends if we set up a listener (psubscribe like above) on redis container using redis-cli as before, for now. We will be using these events on a Go application we will be writing because of the following.

Issues

A sample key, transferred from Kafka to Redis, is something like below. Now this might look great because everything (even our custom transformer) worked, we might have additional needs (like we actually do) such as setting a TTL for each key. As you can see, neither kafka-connect, nor the redis-sink can do that because it uses MSET to send keys to Redis. Here is why. We will need to write an app (part 2) to cover this problem.

a key created via kafka connect

Troubleshooting

  • Redis-sink cannot process null keys in messages and the connector will shut down after trying a couple of times. You will need to fix the offending message and resume the connector using below command
curl -X PUT http://localhost:8083/connectors/{redis-sink-connector}/resume
  • If no Redis keys are being generated, you might want to watch for connector errors by monitoring kafka-connect logs. You can change the log levels for the below loggers to better investigate the issue.
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/admin/loggers/com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig -d '{"level": "DEBUG"}'  

redis-sink loggers
"com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig",
"com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl",
"com.github.jcustenborder.kafka.connect.redis.RedisSessionFactoryImpl$RedisSessionImpl",
"com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig",
"com.github.jcustenborder.kafka.connect.redis.RedisSinkTask",
"com.github.jcustenborder.kafka.connect.redis.SinkOperation",
"com.github.jcustenborder.kafka.connect.utils.VersionUtil",
"com.github.jcustenborder.kafka.connect.utils.jackson.ConfigDefSerializationModule",
"com.github.jcustenborder.kafka.connect.utils.jackson.HeaderSerializationModule",
"com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule",
"com.github.jcustenborder.kafka.connect.utils.jackson.StructSerializationModule",
  • Some changes to Redis or connector configurations (such as updating connector plugin jars) may require a kafka-connect/Redis restart which take a second or two at most. If you use config set .. to change redis configuration, that change is effective immediately and no update is required but redis.conf changes do.

In our next article we will see how we can use the pub/sub channel data Redis provides and take action according to incoming events.

Thanks for reading.

--

--

Yiğit İrez

Let’s talk devops, automation and architectures, everyday, all day long. https://www.linkedin.com/in/yigitirez/