Redis监听事件

  • A+
所属分类:Java redis

  有时候我们希望监听某个key的删除或者其他事件,来做一些自己的业务操作。redis 的pub/sub 提供了这个能力。

  参考官网:https://redis.io/topics/notifications

1. redis 服务端和客户端测试

  redis 官网说了,默认的话事件通知是关闭的。如果需要开启可以修改redis.conf 文件中 notify-keyspace-events 配置。 或者用CONFIG SET 命令修改(只针对当前进程有效,重启失效)。

官网提供的事件类型如下:(每个字母代表一个事件类型的缩写)

Redis监听事件

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
t     Stream commands
d     Module key type events
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
m     Key miss events (events generated when a key that doesn't exist is accessed)
A     Alias for "g$lshztxed", so that the "AKE" string means all the events except "m".

Redis监听事件

  K或者E至少有一个存在。如果需要监听所有的事件可以订阅 'KEA' 事件。 

1. 测试监听失效事件

(1) 修改redis.conf

Redis监听事件

#
notify-keyspace-events Ex
#
#  By default all notifications are disabled because most users don't need
#  this feature and the feature has some overhead. Note that if you don't#  specify at least one of K or E, no events will be delivered.
# notify-keyspace-events ""

Redis监听事件

(2) 服务器启动后查看配置

127.0.0.1:6379> config get notify-keyspace-events1) "notify-keyspace-events"
2) "xE"

(3) 启动一客户端监听所有数据库的失效事件

Redis监听事件

127.0.0.1:6379> psubscribe __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "test"
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "mykey"

Redis监听事件

  __keyevent@*__:expired    中的* 代表任意库,可以指定0-15库中的任意一个库, 也可以用通配符。

2. 监听所有事件

1. 修改配置通知所有事件

127.0.0.1:6379> config set notify-keyspace-events KEA
OK127.0.0.1:6379> config get notify-keyspace-events1) "notify-keyspace-events"
2) "AKE"

2.  客户端进行监听(psubsribe 后面的参数是可变数组,可以一次就监听多个事件)

127.0.0.1:6379> psubscribe '__key*__:*'Reading messages... (press Ctrl-C to quit)1) "psubscribe"
2) "__key*__:*"
3) (integer) 1

3. 重启一个客户端进行操作数据

Redis监听事件

127.0.0.1:6379> set key1 value1
OK127.0.0.1:6379> expire key1 9000(integer) 1
127.0.0.1:6379> del key1
(integer) 1

Redis监听事件

4. 查看上面监测的控制台

Redis监听事件

127.0.0.1:6379> psubscribe '__key*__:*'Reading messages... (press Ctrl-C to quit)1) "psubscribe"
2) "__key*__:*"
3) (integer) 1
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "set"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:set"
4) "key1"
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "expire"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expire"
4) "key1"
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "del"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:del"
4) "key1"

Redis监听事件

  可以看到每次操作之后,订阅者可以收到消息的相关信息:注册的事件类型、发生的事件类型、操作的key 名称。

 

  这里需要注意,如果注册了并且服务宕机了,或者某种原因客户端下线了。这时候再次上线不会收到, 也就是下线期间的事件不会进行记录。

2. Springboot 项目监听事件

1. 编写listener

Redis监听事件

package com.xm.ggn.config.redis;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.stereotype.Component;

@Componentpublic class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {        super(listenerContainer);
    }

    @Override    public void onMessage(Message message, byte[] pattern) {        // 获取过期的key
        String expireKey = message.toString();
        System.out.println("终于失效了");
        System.out.println("key is:" + expireKey);
    }

}

Redis监听事件

2. 源码查看

(1) org.springframework.data.redis.listener.KeyExpirationEventMessageListener 

Redis监听事件

package org.springframework.data.redis.listener;import org.springframework.context.ApplicationEventPublisher;import org.springframework.context.ApplicationEventPublisherAware;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.core.RedisKeyExpiredEvent;import org.springframework.lang.Nullable;public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {    private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
    @Nullable    private ApplicationEventPublisher publisher;    public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {        super(listenerContainer);
    }    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
    }    protected void doHandleMessage(Message message) {        this.publishEvent(new RedisKeyExpiredEvent(message.getBody()));
    }    protected void publishEvent(RedisKeyExpiredEvent event) {        if (this.publisher != null) {            this.publisher.publishEvent(event);
        }

    }    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {        this.publisher = applicationEventPublisher;
    }
}

Redis监听事件

  可以看到是借助于Spring的事件机制来完成的。

(2) org.springframework.data.redis.listener.KeyspaceEventMessageListener

Redis监听事件

package org.springframework.data.redis.listener;import java.util.Properties;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.lang.Nullable;import org.springframework.util.Assert;import org.springframework.util.ObjectUtils;import org.springframework.util.StringUtils;public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {    private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");    private final RedisMessageListenerContainer listenerContainer;    private String keyspaceNotificationsConfigParameter = "EA";    public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");        this.listenerContainer = listenerContainer;
    }    public void onMessage(Message message, @Nullable byte[] pattern) {        if (message != null && !ObjectUtils.isEmpty(message.getChannel()) && !ObjectUtils.isEmpty(message.getBody())) {            this.doHandleMessage(message);
        }
    }    protected abstract void doHandleMessage(Message var1);    public void init() {        if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) {
            RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();            try {
                Properties config = connection.getConfig("notify-keyspace-events");                if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
                    connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter);
                }
            } finally {
                connection.close();
            }
        }        this.doRegister(this.listenerContainer);
    }    protected void doRegister(RedisMessageListenerContainer container) {        this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
    }    public void destroy() throws Exception {        this.listenerContainer.removeMessageListener(this);
    }    public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {        this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
    }    public void afterPropertiesSet() throws Exception {        this.init();
    }
}

Redis监听事件

  可以看到启动后悔修改redis 的事件机制,同时注册监听过期事件。

3. 自己实现类似于Spring的机制

  借助于netty 和 spring 的事件机制实现。

1. com.xm.ggn.test.springevent.RedisConnection

  启动后利用netty 建立一个链接并且发送订阅事件

Redis监听事件

package com.xm.ggn.test.springevent;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;

@Component
@Slf4jpublic class RedisConnection {    private static final String HOST = System.getProperty("host", "192.168.145.139");    private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));

    @PostConstruct    public void init() {        try {
            Bootstrap b = new Bootstrap();
            EventLoopGroup group = new NioEventLoopGroup();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new RedisClientHandler());
                        }
                    });            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();
            String sendCmd = "psubscribe __key*__:*";
            ch.writeAndFlush(sendCmd);
            log.info("已经建立redis 连接, 并且发送订阅事件命令");
        } catch (Exception e) {
            log.error("redis handler error", e);
        }
    }

}

Redis监听事件

2.  RedisClientHandler

Redis监听事件

package com.xm.ggn.test.springevent;import com.xm.ggn.utils.system.SpringBootUtils;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelDuplexHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.util.CharsetUtil;import org.apache.commons.lang3.StringUtils;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.stream.Collectors;public class RedisClientHandler extends ChannelDuplexHandler {

    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {        // 转换发出去的数据格式
        msg = rehandleRequest(msg);
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
    }    /**
     * 重新处理消息,处理为 RESP 认可的数据
     * set foo bar
     * 对应下面数据
     * *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n     */
    private String rehandleRequest(Object msg) {
        String result = msg.toString().trim();
        String[] params = result.split(" ");
        List<String> allParam = new ArrayList<>();
        Arrays.stream(params).forEach(s -> {
            allParam.add("$" + s.length() + "\r\n" + s + "\r\n"); // 参数前$length\r\n, 参数后增加 \r\n        });
        allParam.add(0, "*" + allParam.size() + "\r\n");
        StringBuilder stringBuilder = new StringBuilder();
        allParam.forEach(p -> {
            stringBuilder.append(p);
        });        return stringBuilder.toString();
    }

    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String result = new String(bytes);// 转换接受到的数据格式
        result = rehandleResponse(result).toString();        try {            // 接收到消息之后用Spring 的事件机制发送消息
            SpringBootUtils.applicationContext.publishEvent(new RedisEvent(this, result));
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }    /**
     * 重新处理响应消息     */
    private Object rehandleResponse(String result) {        // 状态恢复 - “+OK\r\n”
        if (result.startsWith("+")) {            return result.substring(1, result.length() - 2);
        }        // 错误回复(error reply)的第一个字节是 "-"。例如 `flushallE` 返回的 `-ERR unknown command 'flushallE'\r\n`
        if (result.startsWith("-")) {            return result.substring(1, result.length() - 2);
        }        // 整数回复(integer reply)的第一个字节是 ":"。 例如 `llen mylist` 查看list 大小返回的 `:3\r\n`
        if (result.startsWith(":")) {            return result.substring(1, result.length() - 2);
        }        // 批量回复(bulk reply)的第一个字节是 "$", 例如:  `get foo` 返回的结果为 `$3\r\nbar\r\n`
        if (result.startsWith("$")) {
            result = StringUtils.substringAfter(result, "\r\n");            return StringUtils.substringBeforeLast(result, "\r\n");
        }        // 多条批量回复(multi bulk reply)的第一个字节是 "*", 例如: *2\r\n$3\r\nfoo\r\n$4\r\nname\r\n
        if (result.startsWith("*")) {
            result = StringUtils.substringAfter(result, "\r\n");
            String[] split = result.split("\\$\\d\r\n");
            List<String> collect = Arrays.stream(split).filter(tmpStr -> StringUtils.isNotBlank(tmpStr)).collect(Collectors.toList());
            List<String> resultList = new ArrayList<>(collect.size());
            collect.forEach(str1 -> {
                resultList.add(StringUtils.substringBeforeLast(str1, "\r\n"));
            });            return resultList;
        }        return "unknow result";
    }

    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }

}

Redis监听事件

3.  RedisEvent

Redis监听事件

package com.xm.ggn.test.springevent;import org.springframework.context.ApplicationEvent;public class RedisEvent extends ApplicationEvent {    private static final long serialVersionUID = -9184671635725233773L;    private Object msg;    public RedisEvent(Object source, final String msg) {        super(source);        this.msg = msg;
    }    public Object getMsg() {        return msg;
    }    public void setMsg(Object msg) {        this.msg = msg;
    }
}

Redis监听事件

4. RedisEventListener 事件监听器,用于直接打印消息

Redis监听事件

package com.xm.ggn.test.springevent;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;

@Componentpublic class RedisEventListener implements ApplicationListener<RedisEvent> {

    @Override    public void onApplicationEvent(RedisEvent applicationEvent) {        // handle event
        System.out.println("收到事件,消息为:" + applicationEvent.getMsg());
    }
}

Redis监听事件

5. 测试结果:

(1)在redis 客户进行一系列操作

127.0.0.1:6379> set mykey myvalue
OK127.0.0.1:6379> del mykey
(integer) 1

(2) 控制台打印如下:

Redis监听事件

收到事件,消息为:[pmessage
$10__key*__:*$18__keyevent@0__:set, mykey]
收到事件,消息为:[pmessage
$10__key*__:*$18__keyevent@0__:del, mykey]

Redis监听事件

 

【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: