1. 创建redis集群

    创建redis配置文件
    [root@letsdev redis-cluster-test]# ll
    总用量 17328
    drwxr-xr-x  4 root root       30 6月   2 17:30 6381
    drwxr-xr-x  4 root root       30 6月   2 17:30 6382
    drwxr-xr-x  4 root root       30 6月  16 11:41 6383
    drwxr-xr-x  4 root root       30 6月   2 17:30 6384
    drwxr-xr-x  4 root root       30 6月   2 17:30 6385
    drwxr-xr-x  4 root root       30 6月   2 17:30 6386
    [root@letsdev redis-cluster-test]# cat 6381/
    conf/ data/ 
    [root@letsdev redis-cluster-test]# cat 6381/conf/redis.conf 
    notify-keyspace-events "Ex"
    bind 0.0.0.0
    port 6381
    cluster-enabled yes
    cluster-config-file nodes.conf
    cluster-node-timeout 5000
    appendonly yes
    
    
    启动redis docker实例 可以加-v /home/redis-cluster-test/6381/data:/data 将数据目录映射出来
    docker run -d --name redis-6381  --net host -v /home/redis-cluster-test/6381/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    docker run -d --name redis-6382  --net host -v /home/redis-cluster-test/6382/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    docker run -d --name redis-6383  --net host -v /home/redis-cluster-test/6383/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    docker run -d --name redis-6384  --net host -v /home/redis-cluster-test/6384/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    docker run -d --name redis-6385  --net host -v /home/redis-cluster-test/6385/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    docker run -d --name redis-6386  --net host -v /home/redis-cluster-test/6386/conf/redis.conf:/usr/local/etc/redis/redis.conf  redis:4.0.11 redis-server /usr/local/etc/redis/redis.conf
    
    在redis安装包src目录中设置redis集群信息
    [root@letsdev src]# pwd
    /home/redis-cluster-test/redis-4.0.11/src
    [root@letsdev src]# ll|grep redis-trib.rb 
    -rwxrwxr-x 1 root root  65991 8月   4 2018 redis-trib.rb
    [root@letsdev src]# ./redis-trib.rb create --replicas 1 192.168.250.78:6381 192.168.250.78:6382 192.168.250.78:6383 192.168.250.78:6384 192.168.250.78:6385 192.168.250.78:6386
    
    如果设置错误,删除镜像重建
    docker rm -f redis-6381
    docker rm -f redis-6382
    docker rm -f redis-6383
    docker rm -f redis-6384
    docker rm -f redis-6385
    docker rm -f redis-6386

     

  2. springboot配置application.property
    maven依赖

     

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <version>2.1.6.RELEASE</version>
    </dependency>
    
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
    </dependency>
    spring.redis.database=0
    #注意!!!注释掉这个host配置就使用集群配置,否则使用单机配置
    #spring.redis.host=redis
    spring.redis.cluster.nodes =192.168.250.78:6381,192.168.250.78:6382,192.168.250.78:6383,192.168.250.78:6384,192.168.250.78:6385,192.168.250.78:6386
    spring.redis.cluster.timeout=1000
    spring.redis.cluster.max-redirects=3
    
    # 连接池最大连接数(使用负值表示没有限制)
    spring.redis.lettuce.pool.max-active=8
    # 连接池最大阻塞等待时间(使用负值表示没有限制)
    spring.redis.lettuce.pool.max-wait=-1ms
    # 连接池中的最大空闲连接
    spring.redis.lettuce.pool.max-idle=8
    # 连接池中的最小空闲连接
    spring.redis.lettuce.pool.min-idle=0
  3. 创建连接配置类RedisConfig.java

    package com.ccr.config;
    
    import com.ccr.service.listener.RedisMessageListenerFactory;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.AutoConfigureAfter;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
    import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
    import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
    import org.springframework.cache.annotation.CachingConfigurerSupport;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;
    import org.springframework.core.env.MapPropertySource;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisClusterConfiguration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.jedis.JedisConnection;
    import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @caobin on 2018/5/11.
     */
    @Configuration
    @EnableCaching
    @ConditionalOnClass({ JedisConnection.class, RedisOperations.class, RedisProperties.Jedis.class, MessageListener.class })
    @AutoConfigureAfter({ JacksonAutoConfiguration.class,
            org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration.class })
    public class RedisConfig extends CachingConfigurerSupport {
        @Autowired
        private Environment environment;
    
    //    @Bean
    //    public RedisTemplate<String, Serializable> redisTemplate() {
    //        RedisTemplate<String, Serializable> template = new RedisTemplate<>();
    //        template.setKeySerializer(new StringRedisSerializer());
    //        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
    //        template.setConnectionFactory(myLettuceConnectionFactory());
    //        return template;
    //    }
    
    
        @Bean
        public RedisConnectionFactory myLettuceConnectionFactory() {
            Map<String, Object> source = new HashMap<String, Object>();
            source.put("spring.redis.cluster.nodes", environment.getProperty("spring.redis.cluster.nodes"));
            source.put("spring.redis.cluster.timeout", environment.getProperty("spring.redis.cluster.timeout"));
            source.put("spring.redis.cluster.max-redirects", environment.getProperty("spring.redis.cluster.max-redirects"));
            RedisClusterConfiguration redisClusterConfiguration;
            redisClusterConfiguration = new RedisClusterConfiguration(new MapPropertySource("RedisClusterConfiguration", source));
            return new LettuceConnectionFactory(redisClusterConfiguration);
        }
    
    
        @Bean
        public RedisTemplate<String, String> redisTemplate() {
            StringRedisTemplate template = new StringRedisTemplate(myLettuceConnectionFactory());
            Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(String.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Configuration
        @ConditionalOnExpression("!'${spring.redis.host:}'.isEmpty()")
        public static class RedisStandAloneAutoConfiguration {
            @Bean
            public RedisMessageListenerContainer customizeRedisListenerContainer(
                    RedisConnectionFactory redisConnectionFactory,MessageListener messageListener) {
                RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
                redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
                //注意!!!因为设置的是notify-keyspace-events "Ex" event事件 ,这里监听keyevent,否则接收不到消息,示例  pattern:__keyevent@0__:*:channel:__keyevent@0__:expired
                redisMessageListenerContainer.addMessageListener(messageListener,new PatternTopic("__keyevent@0__:*"));
                return redisMessageListenerContainer;
            }
        }
    
    
        @Configuration
        @ConditionalOnExpression("'${spring.redis.host:}'.isEmpty()")
        public static class RedisClusterAutoConfiguration {
            @Bean
            public RedisMessageListenerFactory redisMessageListenerFactory(BeanFactory beanFactory,
                                                                           RedisConnectionFactory redisConnectionFactory) {
                RedisMessageListenerFactory beans = new RedisMessageListenerFactory();
                beans.setBeanFactory(beanFactory);
                beans.setRedisConnectionFactory(redisConnectionFactory);
                return beans;
            }
        }
    
    }
    

     

  4. 因为redis key被hash到了redis不同的节点中,在对应的节点过期后通知,所以要针对所有节点做监听。RedisMessageListenerFactory 创建所有master节点监听

    package com.ccr.service.listener;
    
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.config.BeanDefinition;
    import org.springframework.beans.factory.support.BeanDefinitionBuilder;
    import org.springframework.beans.factory.support.DefaultListableBeanFactory;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.connection.RedisClusterConnection;
    import org.springframework.data.redis.connection.RedisClusterNode;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import redis.clients.jedis.JedisShardInfo;
    
    public class RedisMessageListenerFactory implements BeanFactoryAware, ApplicationListener<ContextRefreshedEvent> {
    
        private DefaultListableBeanFactory beanFactory;
    
        private RedisConnectionFactory redisConnectionFactory;
    
        @Autowired
        private MessageListener messageListener;
    
        @Override
        public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
            this.beanFactory = (DefaultListableBeanFactory) beanFactory;
        }
    
        public void setRedisConnectionFactory(RedisConnectionFactory redisConnectionFactory) {
            this.redisConnectionFactory = redisConnectionFactory;
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
            RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
            if (redisClusterConnection != null) {
                Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
                for (RedisClusterNode node : nodes) {
                    if (node.isMaster()) {
                        String containerBeanName = "messageContainer" + node.hashCode();
                        if (beanFactory.containsBean(containerBeanName)) {
                            return;
                        }
                        JedisConnectionFactory factory = new JedisConnectionFactory(
                                new JedisShardInfo(node.getHost(), node.getPort()));
                        BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
                                .genericBeanDefinition(RedisMessageListenerContainer.class);
                        containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
                        containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
                        containerBeanDefinitionBuilder.setLazyInit(false);
                        beanFactory.registerBeanDefinition(containerBeanName,
                                containerBeanDefinitionBuilder.getRawBeanDefinition());
    
                        RedisMessageListenerContainer container = beanFactory
                                .getBean(containerBeanName, RedisMessageListenerContainer.class);
                        String listenerBeanName = "messageListener" + node.hashCode();
                        if (beanFactory.containsBean(listenerBeanName)) {
                            return;
                        }
                        //注意!!!因为设置的是notify-keyspace-events "Ex" event事件 ,这里监听keyevent,否则接收不到消息,示例  pattern:__keyevent@0__:*:channel:__keyevent@0__:expired
                        container.addMessageListener(messageListener, new PatternTopic("__keyevent@0__:*"));
                        container.start();
                    }
                }
            }
        }
    }
  5. 实现 MessageListener接口做消息监听

    package com.example.springredis.redis;
    
    import com.example.springredis.hotkey.HotKey;
    import com.example.springredis.hotkey.HotKeyAction;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    
    @Slf4j
    @Component
    public class KeyExpiredEventMessageListener implements MessageListener {
    
        @Autowired
        private RedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String key = new String(message.getChannel());
            key = key.substring(key.indexOf(":")+1);
            String action = new String(message.getBody());
            if (HotKey.containKey(key)){
                String value = redisTemplate.opsForValue().get(key)+"";
                //可以使用getAndSet方法做原子操作避免多应用重复获取问题,也可以用redislock类
                //String durationString = opsForValue.getAndSet(valKey, "");
                //redisTemplate.delete(valKey);
                switch (action){
                    case "set":
                        log.info("热点Key:{} 修改",key);
                        HotKeyAction.UPDATE.action(key,value);
                        break;
                    case "expired":
                        log.info("热点Key:{} 到期删除",key);
                        HotKeyAction.REMOVE.action(key,null);
                        break;
                    case "del":
                        log.info("热点Key:{} 删除",key);
                        HotKeyAction.REMOVE.action(key,null);
                        break;
                }
            }
            log.info("监听到的信息:{},值是:{}", new String(message.getChannel()), new String(message.getBody()));
        }
    }

     

  6. 在onMessage方法中可以使用RedisLock做分布式锁避免多个应用同时收到过期消息时重复执行逻辑。后期考虑做成方法注解

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import redis.clients.jedis.Jedis;
    
    import java.util.Collections;
    import java.util.UUID;
    
    @Component
    public class RedisLock {
    
        @Autowired
        Jedis jedis;
    
        private static final String SET_IF_NOT_EXIST = "NX"; // NX表示如果不存在key就设置value
        private static final String SET_WITH_EXPIRE_TIME = "PX"; // PX表示毫秒
    
        // 加锁
        public String tryLock(String key,Long acquireTimeout) {
            // 生成随机value
            String identifierValue = UUID.randomUUID().toString();
            // 设置超时时间
            Long endTime = System.currentTimeMillis() + acquireTimeout;
            // 循环获取锁
            while (System.currentTimeMillis() < endTime) {
                String result = jedis.set(key,identifierValue, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, acquireTimeout);
                if("OK".equals(result)) {
                    return identifierValue;
                }
            }
            return null;
        }
    
        // 解锁
    //    public void delLock(String key,String identifierValue) {
    //        // 判断是否是同一把锁
    //        try{
    //            if(jedis.get(key).equals(identifierValue)){
    //                // 此处操作非原子性,容易造成释放非自己的锁
    //                jedis.del(key);
    //            }
    //        }catch(Exception e) {
    //            e.printStackTrace();
    //        }
    //    }
    
        // 使用Lua代码解锁
        public void delLock(String key,String identifierValue) {
            try{
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                Long result = (Long) jedis.eval(script, Collections.singletonList(key), Collections.singletonList(identifierValue));
                if (1 == result) {
                   System.out.println(result+"释放锁成功");
                } if (0 == result) {
                    System.out.println(result+"释放锁失败");
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }