welcome to xlongwei.com

欢迎大家一起学习、交流、分享


QQ:9167702333 邮箱:admin@xlongwei.com

redis pub/sub 发布订阅示例


分类 Java   关键字 分享   标签 java   web   spring   redis   发布 hongwei  1458810174146
注意 转载须保留原文链接,译文链接,作者译者等信息。  
redis支持发布订阅功能,因此能够作为dubbo的注册中心,也能提供简易版的消息服务,比较容易上手。

cache.xml,配置redis缓存和监听器
<bean id="redisListener" class="com.itecheast.ite.domain.util.CacheUtil"/>
<redis:listener-container connection-factory="connectionFactory">
<redis:listener ref="redisListener" topic="*"/>
</redis:listener-container>

CacheUtil,提供缓存读写方法,支持redis发布订阅方法,
  • subscribe(String channel, RedisListener listener),订阅某个渠道的消息
  • publish(String channel, Object message),发布消息到渠道
  • get(String cache, String key),获取缓存键值
  • set(String cache, String key, String value),设置缓存键值
  • expire(String cache, String key, long millis),设置键值过期时间
  • getExpire(String cache, String key),获取键值过期时间
public void onMessage(Message message, byte[] pattern) {
RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();
RedisSerializer<String> stringSerializer = redisTemplate.getStringSerializer();
String messageChannel = stringSerializer.deserialize(message.getChannel());
Object messageBody = valueSerializer.deserialize(message.getBody());
logger.info("onMessage: "+messageBody);
for(RedisListener<?> listener : listeners) {//消息分发可以改为异步,提供并发能力
Class<?> genericType = listenersTypes.get(listener);//listener仅对某种类型的消息感兴趣
String listenerChannel = listenersChannels.get(listener);//listener仅对某个渠道的消息感兴趣
try {
boolean channelMatches = listenerChannel==null || "*".equals(listenerChannel) || "*".equals(messageChannel) || listenerChannel.equals(messageChannel) || listenerChannel.matches(messageChannel) || messageChannel.matches(listenerChannel);//匹配渠道和消息类型
if(genericType!=null && channelMatches && genericType.isAssignableFrom(messageBody.getClass())) {
MethodInvoker invoker = new MethodInvoker();
invoker.setArguments(new Object[] { messageBody });
invoker.setTargetObject(listener);
invoker.setTargetMethod("on");//listener是泛型,使用反射传入messageBody
invoker.prepare();
invoker.invoke();
}
}catch(Exception e) {}
}
}

测试,consumer.xml需要注释掉ite-consumer.xml所在的行(仅加载cache.xml即可)
mvn test -Dtest=com.itecheast.ite.domain.util.CacheUtilTester#subscribe -Dmaven.test.skip=false
mvn test -Dtest=com.itecheast.ite.domain.util.CacheUtilTester#publish -Dmaven.test.skip=false

日志输出
2016-03-24 16:55:59.411 INFO  [org.springframework.data.redis.listener.RedisMessageListenerContainer#0-4] CacheUtil:133-- onMessage: hello
2016-03-24 16:55:59.411 INFO [org.springframework.data.redis.listener.RedisMessageListenerContainer#0-4] CacheUtilTester:21-- listened data: hello
2016-03-24 16:55:59.567 INFO [org.springframework.data.redis.listener.RedisMessageListenerContainer#0-5] CacheUtil:133-- onMessage: [a, b, c]
2016-03-24 16:55:59.567 INFO [org.springframework.data.redis.listener.RedisMessageListenerContainer#0-5] CacheUtilTester:21-- listened data: [a, b, c]

参考:spring集成redis缓存spring注解式cache缓存配置