Kafka to Redis with Kafka-Connect: Capturing Redis Key Events -2

Yiğit İrez
4 min readOct 10, 2023

--

In part 1 of this series, we got everything up and running and Redis was sending key events. However, kafka-connect could not create keys with a TTL hence, we will process Redis key events and add a TTL to the new keys using a small go app.

source

Goals:

  • Read Redis events and process them by adding a TTL to the new keys

What We Need

  • Any IDE to write and compile Go in.
  • Relevant containers and setup in Part 1 finished.

All we have to do in this part is the below bit. By the end of this article you would have an app that you can customize to process events as you need.

We did say Redis is publishing events but what exactly happens, you might ask. Well, thanks for asking.

How It Works

If you remember from the previous article, Redis essentially published a message to a channel everytime an operation takes place (set by the notify-keyspace-events configuration in redis.conf). Redis does this using the publish command.

PUBLISH __keyevent@1__:set your:key 
# publish a message to keyevent channel for subscribers listening to changes
# on db1 for set operation and here is the key

And the clients we will create will read this by psubscribing to that channel as such:

psubscribe __keyevent@1__:*
# subscribe to key event channel for messages on db1 for all operations

Redis keeps track of subscribers to each channel and sends messages in the concept of at-most-once. Which is Redis’ way of saying; if you miss the message I don’t care and you lose the event.

While this looks horrifying at first, it makes up with real time speeds of notification of events. If you need something more persistant, Redis streams may be the thing but that’s another topic for another time.

The Code

One thing I explicitly like about this is the simplicity of using Redis pubsub channel, which is what it was made for.

Lets start by getting a Redis client up. Depending on your Redis instance, you might have different settings;

client = redis.NewClient(
&redis.Options{
Addr: "localhost:6379",
DB: 1,
})

Then, we launch a routine to do our thing so we don’t block whatever else that may be happening. Below is the general outline of what we will be doing.

 go func() {
for {
// subscribe to Redis keyspace events with the specified pattern

// listen to pubsub channel and start workers
for event := range pubsub.Channel() {
// extract event
// if event is a set or expire progress, otherwise continue

// if it's a set event, add ttl to the key

// if it's an expire, save somewhere for future notification or processing
}
}
}()

First step is the subscription. Since we only care about key events for db1, we use __keyevent@1__:*. We also wait for confirmation of our subscription using the Receive func. If you were to check out the value it returns, you would’ve seen the subscription operation we just did. In the end, we return the channel object we subscribe to in Redis. Create a func which fits your achitecture style that does basically the below. The channel you return will replace this comment from the above general outline: // subscribe to Redis keyspace events with the specified pattern

//PSubscribeToEvents subscribes to redis keyspace events on db1
func (c ConnectorRepoImpl) PSubscribeToEvents() (*redis.PubSub, error) {
//this specific key event is important because we only care about keyevents and only in the 1st db
pubsub := c.redisClient.PSubscribe("__keyevent@1__:*")

// we need to wait for confirmation that subscription is created
_, err := pubsub.Receive()
if err != nil {
return nil, err
}
return pubsub, nil
}

Once you have the pubsub channel setup, we start listening by looping in the channel as described in the outline at line: for event := range pubsub.Channel() .

At this point, if we receive an event, we can extract the operation from incoming events from the event.Channel and key from the event.Payload. Since the event channel is something like __keyevent@1__:set, we extract this and get set ,which is the actual event operation. Event payload contains the key that the event triggered for.

func extractEventMeta(event *redis.Message) (operation string, redisKey string){
return strings.TrimPrefix(event.Channel, "__keyevent@1__:"), event.Payload
}

We now know what the event operation is and what the key is so we can go ahead and set the key TTL if it is a set operation. This is standard Redis key manipulation. We can use the Persist func if we want to remove the TTL at a later time.

c.redisClient.Expire(key, 10*time.Second)

One thing to watch for set is, it will trigger twice for every key creation. Once for key creation itself, once more when the key’s value is being added. They happen miliseconds apart though so not a big deal.

Similarly, we can watch for expire operation and handle our business logic as we see fit.

A lot of the non essential code was removed to keep this article to the point. I hope it could give an idea of how Redis pubsub channels can be used.

Thanks for reading.

--

--