welcome to xlongwei.com

欢迎大家一起学习、交流、分享


QQ群:162333776 邮箱:admin@xlongwei.com

jedis sharded 功能扩展


分类 Java   关键字 分享   标签 java   linux   spring   redis   发布 hongwei  1471315333995
注意 转载须保留原文链接,译文链接,作者译者等信息。  
jedis 分片模式,不但支持多个redis实例(单机也可以部署多个redis实例),还支持单个redis实例的多个数据库,功能扩展支持:逗号分隔多个配 置:host:port:db:password:timeout:weight,例 如:127.0.0.1:6379:0,127.0.0.1:6379:1,127.0.0.1:6379:2,127.0.0.1:6379:3。

由于配置信息较多,所以扩展了JedisShardInfo,主要是保存db信息和重写createResource获得适当的Jedis实例。
public static class MyJedisShardInfo extends JedisShardInfo {
private int db;
public MyJedisShardInfo(String host, int port, String password, int db, int weight, int timeout) {
super(host, port, timeout, weight);
this.db = db;
setPassword(password);
}
public int getDb() { return db; }
@Override
public Jedis createResource() {
Jedis jedis = super.createResource();
jedis.connect();
if(getPassword()!=null) jedis.auth(getPassword());
if(db>0) jedis.select(db);
return jedis;
}
}

每次运行都通过RedisShardUtil获得ShardedJedis,后者会检测redis分片是否可用
/** 逗号分隔多个配置:host:port:db:password:timeout:weight */
public void setHostsAndPorts(String hostsAndPorts) {
String[] hostAndPortArray = hostsAndPorts.split("[,;]");
for(String hostAndPort : hostAndPortArray) {
String[] hostPort = hostAndPort.split("[:]");
String host = hostPort[0], password = hostPort.length>3?hostPort[3]:null;
int port = Integer.parseInt(hostPort[1]), db = hostPort.length>2?Integer.parseInt(hostPort[2]):0, timeout = hostPort.length>4?Integer.parseInt(hostPort[4]):0, weight = hostPort.length>5?Integer.parseInt(hostPort[5]):1;
nodes.add(new MyJedisShardInfo(host, port, password, db, weight, timeout));
}
shardedJedis = RedisShardUtil.reshard(nodes);
}

重新分片逻辑和之前差不多,判断节点是否可以参考了JedisFactory的makeObject和validateObject代码。
/** 检查jedis是否可用 */
public static boolean check(MyJedisShardInfo info) {
try {
Jedis jedis = new Jedis(info.getHost(), info.getPort(), info.getTimeout());
jedis.connect(); //参考JedisFactory的代码
if(info.getPassword()!=null) jedis.auth(info.getPassword());
if(info.getDb()>0) jedis.select(info.getDb());
boolean check = jedis.isConnected() && jedis.ping().equals("PONG");
jedis.disconnect();
return check;
}catch(JedisException e) {
logger.warn(e.getMessage());
}
return false;
}
/** 重新分片shardedJedis,使用正常节点替代故障节点 */
public static ShardedJedis reshard(List<MyJedisShardInfo> nodes) {
if(reshard) return null;
reshard = true;
Map<MyJedisShardInfo, Boolean> allShardsCheck = new LinkedHashMap<>();
List<JedisShardInfo> okNodes = new LinkedList<>();
for(MyJedisShardInfo node : nodes) {
boolean check = check(node);
allShardsCheck.put(node, check);
if(check) okNodes.add(node);
}
int allSize = allShardsCheck.size(), okSize = okNodes.size();
if(okSize>0 && okSize<allSize) {
List<JedisShardInfo> newNodes = new LinkedList<>();
int okIndex = 0;
for(JedisShardInfo node : nodes) {
Boolean check = allShardsCheck.get(node);
if(!check) {
node = okNodes.get(okIndex);
okIndex = (okIndex+1)%okSize;
}
newNodes.add(node);
}
okNodes = newNodes;
}
reshard = false;
return new ShardedJedis(okNodes);
}

配置示例:
<bean id="springCacheManager" class="com.itecheast.ite.domain.util.MyShardedRedisCacheManager">
<property name="hostsAndPorts" value="127.0.0.1:6379:0,127.0.0.1:6379:1,127.0.0.1:6379:2" />
<property name="defaultExpiration" value="604800" />
<property name="expires">
<map>
<entry key="passwordRetryCache" value="60"/>
<entry key="authorizationCache" value="3600"/>
<entry key="authenticationCache" value="3600"/>
<entry key="shiro-activeSessionCache" value="3600"/>
</map>
</property>
</bean>

截图示例:



评论列表