创建redis集群
创建redis配置文件 [root@letsdev redis-cluster-test]# ll 总用量 17328 drwxr-xr-x 4 root root 30 6月 2 17:30 6381 drwxr-xr-x 4 root root 30 6月 2 17:30 6382 drwxr-xr-x 4 root root 30 6月 16 11:41 6383 drwxr-xr-x 4 root root 30 6月 2 17:30 6384 drwxr-xr-x 4 root root 30 6月 2 17:30 6385 drwxr-xr-x 4 root root 30 6月 2 17:30 6386 [root@letsdev redis-cluster-test]# cat 6381/ conf/ data/ [root@letsdev redis-cluster-test]# cat 6381/conf/redis.conf notify-keyspace-events "Ex" bind 0.0.0.0 port 6381 cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 5000 appendonly yes 启动redis docker实例 可以加-v /home/redis-cluster-test/6381/data:/data 将数据目录映射出来 docker run -d --name redis-6381 --net host -v /home/redis-cluster-test/6381/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf docker run -d --name redis-6382 --net host -v /home/redis-cluster-test/6382/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf docker run -d --name redis-6383 --net host -v /home/redis-cluster-test/6383/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf docker run -d --name redis-6384 --net host -v /home/redis-cluster-test/6384/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf docker run -d --name redis-6385 --net host -v /home/redis-cluster-test/6385/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf docker run -d --name redis-6386 --net host -v /home/redis-cluster-test/6386/conf/redis.conf:/usr/local/etc/redis/redis.conf redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf 在redis安装包src目录中设置redis集群信息 [root@letsdev src]# pwd /home/redis-cluster-test/redis-4.0.11/src [root@letsdev src]# ll|grep redis-trib.rb -rwxrwxr-x 1 root root 65991 8月 4 2018 redis-trib.rb [root@letsdev src]# ./redis-trib.rb create --replicas 1 192.168.250.78:6381 192.168.250.78:6382 192.168.250.78:6383 192.168.250.78:6384 192.168.250.78:6385 192.168.250.78:6386 如果设置错误,删除镜像重建 docker rm -f redis-6381 docker rm -f redis-6382 docker rm -f redis-6383 docker rm -f redis-6384 docker rm -f redis-6385 docker rm -f redis-6386
springboot配置application.property
maven依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.1.6.RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
spring.redis.database=0 #注意!!!注释掉这个host配置就使用集群配置,否则使用单机配置 #spring.redis.host=redis spring.redis.cluster.nodes =192.168.250.78:6381,192.168.250.78:6382,192.168.250.78:6383,192.168.250.78:6384,192.168.250.78:6385,192.168.250.78:6386 spring.redis.cluster.timeout=1000 spring.redis.cluster.max-redirects=3 # 连接池最大连接数(使用负值表示没有限制) spring.redis.lettuce.pool.max-active=8 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.lettuce.pool.max-wait=-1ms # 连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle=8 # 连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle=0
package com.ccr.config; import com.ccr.service.listener.RedisMessageListenerFactory; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import org.springframework.core.env.MapPropertySource; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnection; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import java.util.HashMap; import java.util.Map; /** * @caobin on 2018/5/11. */ @Configuration @EnableCaching @ConditionalOnClass({ JedisConnection.class, RedisOperations.class, RedisProperties.Jedis.class, MessageListener.class }) @AutoConfigureAfter({ JacksonAutoConfiguration.class, org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration.class }) public class RedisConfig extends CachingConfigurerSupport { @Autowired private Environment environment; // @Bean // public RedisTemplate<String, Serializable> redisTemplate() { // RedisTemplate<String, Serializable> template = new RedisTemplate<>(); // template.setKeySerializer(new StringRedisSerializer()); // template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); // template.setConnectionFactory(myLettuceConnectionFactory()); // return template; // } @Bean public RedisConnectionFactory myLettuceConnectionFactory() { Map<String, Object> source = new HashMap<String, Object>(); source.put("spring.redis.cluster.nodes", environment.getProperty("spring.redis.cluster.nodes")); source.put("spring.redis.cluster.timeout", environment.getProperty("spring.redis.cluster.timeout")); source.put("spring.redis.cluster.max-redirects", environment.getProperty("spring.redis.cluster.max-redirects")); RedisClusterConfiguration redisClusterConfiguration; redisClusterConfiguration = new RedisClusterConfiguration(new MapPropertySource("RedisClusterConfiguration", source)); return new LettuceConnectionFactory(redisClusterConfiguration); } @Bean public RedisTemplate<String, String> redisTemplate() { StringRedisTemplate template = new StringRedisTemplate(myLettuceConnectionFactory()); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(String.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Configuration @ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()") public static class RedisStandAloneAutoConfiguration { @Bean public RedisMessageListenerContainer customizeRedisListenerContainer( RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); //注意!!!因为设置的是notify-keyspace-events "Ex" event事件 ,这里监听keyevent,否则接收不到消息,示例 pattern:__keyevent@0__:*:channel:__keyevent@0__:expired redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:*")); return redisMessageListenerContainer; } } @Configuration @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()") public static class RedisClusterAutoConfiguration { @Bean public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory, RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerFactory beans = new RedisMessageListenerFactory(); beans.setBeanFactory(beanFactory); beans.setRedisConnectionFactory(redisConnectionFactory); return beans; } } }
因为redis key被hash到了redis不同的节点中,在对应的节点过期后通知,所以要针对所有节点做监听。RedisMessageListenerFactory 创建所有master节点监听
package com.ccr.service.listener; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.connection.RedisClusterConnection; import org.springframework.data.redis.connection.RedisClusterNode; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import redis.clients.jedis.JedisShardInfo; public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> { private DefaultListableBeanFactory beanFactory; private RedisConnectionFactory redisConnectionFactory; @Autowired private MessageListener messageListener; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) beanFactory; } public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection(); if (redisClusterConnection != null) { Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisConnectionFactory factory = new JedisConnectionFactory( new JedisShardInfo(node.getHost(), node.getPort())); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false); beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition()); RedisMessageListenerContainer container = beanFactory .getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } //注意!!!因为设置的是notify-keyspace-events "Ex" event事件 ,这里监听keyevent,否则接收不到消息,示例 pattern:__keyevent@0__:*:channel:__keyevent@0__:expired container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:*")); container.start(); } } } } }
实现 MessageListener接口做消息监听
package com.example.springredis.redis; import com.example.springredis.hotkey.HotKey; import com.example.springredis.hotkey.HotKeyAction; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class KeyExpiredEventMessageListener implements MessageListener { @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(Message message, byte[] pattern) { String key = new String(message.getChannel()); key = key.substring(key.indexOf(":")+1); String action = new String(message.getBody()); if (HotKey.containKey(key)){ String value = redisTemplate.opsForValue().get(key)+""; //可以使用getAndSet方法做原子操作避免多应用重复获取问题,也可以用redislock类 //String durationString = opsForValue.getAndSet(valKey, ""); //redisTemplate.delete(valKey); switch (action){ case "set": log.info("热点Key:{} 修改",key); HotKeyAction.UPDATE.action(key,value); break; case "expired": log.info("热点Key:{} 到期删除",key); HotKeyAction.REMOVE.action(key,null); break; case "del": log.info("热点Key:{} 删除",key); HotKeyAction.REMOVE.action(key,null); break; } } log.info("监听到的信息:{},值是:{}", new String(message.getChannel()), new String(message.getBody())); } }
在onMessage方法中可以使用RedisLock做分布式锁避免多个应用同时收到过期消息时重复执行逻辑。后期考虑做成方法注解
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import java.util.Collections; import java.util.UUID; @Component public class RedisLock { @Autowired Jedis jedis; private static final String SET_IF_NOT_EXIST = "NX"; // NX表示如果不存在key就设置value private static final String SET_WITH_EXPIRE_TIME = "PX"; // PX表示毫秒 // 加锁 public String tryLock(String key,Long acquireTimeout) { // 生成随机value String identifierValue = UUID.randomUUID().toString(); // 设置超时时间 Long endTime = System.currentTimeMillis() + acquireTimeout; // 循环获取锁 while (System.currentTimeMillis() < endTime) { String result = jedis.set(key,identifierValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, acquireTimeout); if("OK".equals(result)) { return identifierValue; } } return null; } // 解锁 // public void delLock(String key,String identifierValue) { // // 判断是否是同一把锁 // try{ // if(jedis.get(key).equals(identifierValue)){ // // 此处操作非原子性,容易造成释放非自己的锁 // jedis.del(key); // } // }catch(Exception e) { // e.printStackTrace(); // } // } // 使用Lua代码解锁 public void delLock(String key,String identifierValue) { try{ String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Long result = (Long) jedis.eval(script, Collections.singletonList(key), Collections.singletonList(identifierValue)); if (1 == result) { System.out.println(result+"释放锁成功"); } if (0 == result) { System.out.println(result+"释放锁失败"); } }catch (Exception e) { e.printStackTrace(); } } }