- A+
有时候我们希望监听某个key的删除或者其他事件,来做一些自己的业务操作。redis 的pub/sub 提供了这个能力。
参考官网:https://redis.io/topics/notifications
1. redis 服务端和客户端测试
redis 官网说了,默认的话事件通知是关闭的。如果需要开启可以修改redis.conf 文件中 notify-keyspace-events 配置。 或者用CONFIG SET 命令修改(只针对当前进程有效,重启失效)。
官网提供的事件类型如下:(每个字母代表一个事件类型的缩写)
K Keyspace events, published with __keyspace@<db>__ prefix. E Keyevent events, published with __keyevent@<db>__ prefix. g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ... $ String commands l List commands s Set commands h Hash commands z Sorted set commands t Stream commands d Module key type events x Expired events (events generated every time a key expires) e Evicted events (events generated when a key is evicted for maxmemory) m Key miss events (events generated when a key that doesn't exist is accessed) A Alias for "g$lshztxed", so that the "AKE" string means all the events except "m".
K或者E至少有一个存在。如果需要监听所有的事件可以订阅 'KEA' 事件。
1. 测试监听失效事件
(1) 修改redis.conf
# notify-keyspace-events Ex # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't# specify at least one of K or E, no events will be delivered. # notify-keyspace-events ""
(2) 服务器启动后查看配置
127.0.0.1:6379> config get notify-keyspace-events1) "notify-keyspace-events" 2) "xE"
(3) 启动一客户端监听所有数据库的失效事件
127.0.0.1:6379> psubscribe __keyevent@*__:expired Reading messages... (press Ctrl-C to quit)1) "psubscribe" 2) "__keyevent@*__:expired" 3) (integer) 1 1) "pmessage" 2) "__keyevent@*__:expired" 3) "__keyevent@0__:expired" 4) "test" 1) "pmessage" 2) "__keyevent@*__:expired" 3) "__keyevent@0__:expired" 4) "mykey"
__keyevent@*__:expired 中的* 代表任意库,可以指定0-15库中的任意一个库, 也可以用通配符。
2. 监听所有事件
1. 修改配置通知所有事件
127.0.0.1:6379> config set notify-keyspace-events KEA OK127.0.0.1:6379> config get notify-keyspace-events1) "notify-keyspace-events" 2) "AKE"
2. 客户端进行监听(psubsribe 后面的参数是可变数组,可以一次就监听多个事件)
127.0.0.1:6379> psubscribe '__key*__:*'Reading messages... (press Ctrl-C to quit)1) "psubscribe" 2) "__key*__:*" 3) (integer) 1
3. 重启一个客户端进行操作数据
127.0.0.1:6379> set key1 value1 OK127.0.0.1:6379> expire key1 9000(integer) 1 127.0.0.1:6379> del key1 (integer) 1
4. 查看上面监测的控制台
127.0.0.1:6379> psubscribe '__key*__:*'Reading messages... (press Ctrl-C to quit)1) "psubscribe" 2) "__key*__:*" 3) (integer) 1 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "set" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:set" 4) "key1" 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "expire" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:expire" 4) "key1" 1) "pmessage" 2) "__key*__:*" 3) "__keyspace@0__:key1" 4) "del" 1) "pmessage" 2) "__key*__:*" 3) "__keyevent@0__:del" 4) "key1"
可以看到每次操作之后,订阅者可以收到消息的相关信息:注册的事件类型、发生的事件类型、操作的key 名称。
这里需要注意,如果注册了并且服务宕机了,或者某种原因客户端下线了。这时候再次上线不会收到, 也就是下线期间的事件不会进行记录。
2. Springboot 项目监听事件
1. 编写listener
package com.xm.ggn.config.redis;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.stereotype.Component; @Componentpublic class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] pattern) { // 获取过期的key String expireKey = message.toString(); System.out.println("终于失效了"); System.out.println("key is:" + expireKey); } }
2. 源码查看
(1) org.springframework.data.redis.listener.KeyExpirationEventMessageListener
package org.springframework.data.redis.listener;import org.springframework.context.ApplicationEventPublisher;import org.springframework.context.ApplicationEventPublisherAware;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.core.RedisKeyExpiredEvent;import org.springframework.lang.Nullable;public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired"); @Nullable private ApplicationEventPublisher publisher; public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } protected void doRegister(RedisMessageListenerContainer listenerContainer) { listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC); } protected void doHandleMessage(Message message) { this.publishEvent(new RedisKeyExpiredEvent(message.getBody())); } protected void publishEvent(RedisKeyExpiredEvent event) { if (this.publisher != null) { this.publisher.publishEvent(event); } } public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } }
可以看到是借助于Spring的事件机制来完成的。
(2) org.springframework.data.redis.listener.KeyspaceEventMessageListener
package org.springframework.data.redis.listener;import java.util.Properties;import org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.lang.Nullable;import org.springframework.util.Assert;import org.springframework.util.ObjectUtils;import org.springframework.util.StringUtils;public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean { private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*"); private final RedisMessageListenerContainer listenerContainer; private String keyspaceNotificationsConfigParameter = "EA"; public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!"); this.listenerContainer = listenerContainer; } public void onMessage(Message message, @Nullable byte[] pattern) { if (message != null && !ObjectUtils.isEmpty(message.getChannel()) && !ObjectUtils.isEmpty(message.getBody())) { this.doHandleMessage(message); } } protected abstract void doHandleMessage(Message var1); public void init() { if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) { RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection(); try { Properties config = connection.getConfig("notify-keyspace-events"); if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) { connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter); } } finally { connection.close(); } } this.doRegister(this.listenerContainer); } protected void doRegister(RedisMessageListenerContainer container) { this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS); } public void destroy() throws Exception { this.listenerContainer.removeMessageListener(this); } public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) { this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter; } public void afterPropertiesSet() throws Exception { this.init(); } }
可以看到启动后悔修改redis 的事件机制,同时注册监听过期事件。
3. 自己实现类似于Spring的机制
借助于netty 和 spring 的事件机制实现。
1. com.xm.ggn.test.springevent.RedisConnection
启动后利用netty 建立一个链接并且发送订阅事件
package com.xm.ggn.test.springevent;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; @Component @Slf4jpublic class RedisConnection { private static final String HOST = System.getProperty("host", "192.168.145.139"); private static final int PORT = Integer.parseInt(System.getProperty("port", "6379")); @PostConstruct public void init() { try { Bootstrap b = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RedisClientHandler()); } }); // Start the connection attempt. Channel ch = b.connect(HOST, PORT).sync().channel(); String sendCmd = "psubscribe __key*__:*"; ch.writeAndFlush(sendCmd); log.info("已经建立redis 连接, 并且发送订阅事件命令"); } catch (Exception e) { log.error("redis handler error", e); } } }
2. RedisClientHandler
package com.xm.ggn.test.springevent;import com.xm.ggn.utils.system.SpringBootUtils;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelDuplexHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import io.netty.util.CharsetUtil;import org.apache.commons.lang3.StringUtils;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.stream.Collectors;public class RedisClientHandler extends ChannelDuplexHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { // 转换发出去的数据格式 msg = rehandleRequest(msg); ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8)); } /** * 重新处理消息,处理为 RESP 认可的数据 * set foo bar * 对应下面数据 * *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nbar\r\n */ private String rehandleRequest(Object msg) { String result = msg.toString().trim(); String[] params = result.split(" "); List<String> allParam = new ArrayList<>(); Arrays.stream(params).forEach(s -> { allParam.add("$" + s.length() + "\r\n" + s + "\r\n"); // 参数前$length\r\n, 参数后增加 \r\n }); allParam.add(0, "*" + allParam.size() + "\r\n"); StringBuilder stringBuilder = new StringBuilder(); allParam.forEach(p -> { stringBuilder.append(p); }); return stringBuilder.toString(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = (ByteBuf) msg; byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); String result = new String(bytes);// 转换接受到的数据格式 result = rehandleResponse(result).toString(); try { // 接收到消息之后用Spring 的事件机制发送消息 SpringBootUtils.applicationContext.publishEvent(new RedisEvent(this, result)); } catch (Exception exception) { exception.printStackTrace(); } } /** * 重新处理响应消息 */ private Object rehandleResponse(String result) { // 状态恢复 - “+OK\r\n” if (result.startsWith("+")) { return result.substring(1, result.length() - 2); } // 错误回复(error reply)的第一个字节是 "-"。例如 `flushallE` 返回的 `-ERR unknown command 'flushallE'\r\n` if (result.startsWith("-")) { return result.substring(1, result.length() - 2); } // 整数回复(integer reply)的第一个字节是 ":"。 例如 `llen mylist` 查看list 大小返回的 `:3\r\n` if (result.startsWith(":")) { return result.substring(1, result.length() - 2); } // 批量回复(bulk reply)的第一个字节是 "$", 例如: `get foo` 返回的结果为 `$3\r\nbar\r\n` if (result.startsWith("$")) { result = StringUtils.substringAfter(result, "\r\n"); return StringUtils.substringBeforeLast(result, "\r\n"); } // 多条批量回复(multi bulk reply)的第一个字节是 "*", 例如: *2\r\n$3\r\nfoo\r\n$4\r\nname\r\n if (result.startsWith("*")) { result = StringUtils.substringAfter(result, "\r\n"); String[] split = result.split("\\$\\d\r\n"); List<String> collect = Arrays.stream(split).filter(tmpStr -> StringUtils.isNotBlank(tmpStr)).collect(Collectors.toList()); List<String> resultList = new ArrayList<>(collect.size()); collect.forEach(str1 -> { resultList.add(StringUtils.substringBeforeLast(str1, "\r\n")); }); return resultList; } return "unknow result"; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.print("exceptionCaught: "); cause.printStackTrace(System.err); ctx.close(); } }
3. RedisEvent
package com.xm.ggn.test.springevent;import org.springframework.context.ApplicationEvent;public class RedisEvent extends ApplicationEvent { private static final long serialVersionUID = -9184671635725233773L; private Object msg; public RedisEvent(Object source, final String msg) { super(source); this.msg = msg; } public Object getMsg() { return msg; } public void setMsg(Object msg) { this.msg = msg; } }
4. RedisEventListener 事件监听器,用于直接打印消息
package com.xm.ggn.test.springevent;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component; @Componentpublic class RedisEventListener implements ApplicationListener<RedisEvent> { @Override public void onApplicationEvent(RedisEvent applicationEvent) { // handle event System.out.println("收到事件,消息为:" + applicationEvent.getMsg()); } }
5. 测试结果:
(1)在redis 客户进行一系列操作
127.0.0.1:6379> set mykey myvalue OK127.0.0.1:6379> del mykey (integer) 1
(2) 控制台打印如下:
收到事件,消息为:[pmessage $10__key*__:*$18__keyevent@0__:set, mykey] 收到事件,消息为:[pmessage $10__key*__:*$18__keyevent@0__:del, mykey]
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】