Java?Zookeeper分布式分片算法超詳細(xì)講解流程
背景
公司的一個(gè)服務(wù)需要做類似于分片的邏輯,一開始服務(wù)基于傳統(tǒng)部署方式通過(guò)本地配置文件配置的方式就可以指定該機(jī)器服務(wù)的分片內(nèi)容如:0,1,2,3,隨著系統(tǒng)的升級(jí)迭代,該服務(wù)進(jìn)行了容器化部署,所以原來(lái)基于本地配置文件各自配置分片數(shù)據(jù)的方式就不適用了,原來(lái)的部署方式使得服務(wù)是有狀態(tài),是一種非云原生的方式,所以該服務(wù)要重新設(shè)計(jì)實(shí)現(xiàn)一套分布式服務(wù)分片邏輯。
技術(shù)方案
分布式協(xié)調(diào)中間件
要實(shí)現(xiàn)分布式服務(wù)分片的能力,需要有一個(gè)分布式中間件,如:Redis,Mysql,Zookeeper等等都可以,我們選用Zookeeper。
基于Zookeeper的技術(shù)方案
使用Zookeeper主要是基于Zookeeper的臨時(shí)節(jié)點(diǎn)和節(jié)點(diǎn)變化監(jiān)聽機(jī)制,具體的技術(shù)設(shè)計(jì)如下:
服務(wù)注冊(cè)目錄設(shè)計(jì)
Zookeeper的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)類似于目錄,服務(wù)注冊(cè)后的目錄類似如下結(jié)構(gòu):
解釋下該目錄結(jié)構(gòu),首先/xxxx/xxxx/sharding是區(qū)別于其他業(yè)務(wù)的的目錄,該目錄節(jié)點(diǎn)是持久的,service是服務(wù)目錄,標(biāo)識(shí)一個(gè)服務(wù),該節(jié)點(diǎn)也是持久的,ip1,ip2是該服務(wù)注冊(cè)到Zookeeper的機(jī)器列表節(jié)點(diǎn),該節(jié)點(diǎn)是臨時(shí)節(jié)點(diǎn)。
/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2
服務(wù)分片處理流程
- 服務(wù)啟動(dòng),創(chuàng)建
CuratorFramework客戶端,設(shè)置客戶端連接狀態(tài)監(jiān)聽; - 向
Zookeeper注冊(cè)該機(jī)器的信息,這里設(shè)計(jì)簡(jiǎn)單,機(jī)器信息就是ip地址; - 注冊(cè)機(jī)器信息后,從
Zookeeper獲取所有注冊(cè)信息; - 根據(jù)
Zookeeper獲取的所有注冊(cè)機(jī)器信息根據(jù)分片算法進(jìn)行分片計(jì)算。
編碼實(shí)現(xiàn)
ZookeeperConfig
Zookeeper的配置信息
@Data
public class ZookeeperConfig {
/**
* zk集群地址
*/
private String zkAddress;
/**
* 注冊(cè)服務(wù)目錄
*/
private String nodePath;
/**
* 分片的服務(wù)名
*/
private String serviceName;
/**
* 分片總數(shù)
*/
private Integer shardingCount;
public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
this.zkAddress = zkAddress;
this.nodePath = nodePath;
this.serviceName = "/" + serviceName;
this.shardingCount = shardingCount;
}
/**
* 等待重試的間隔時(shí)間的初始值.
* 單位毫秒.
*/
private int baseSleepTimeMilliseconds = 1000;
/**
* 等待重試的間隔時(shí)間的最大值.
* 單位毫秒.
*/
private int maxSleepTimeMilliseconds = 3000;
/**
* 最大重試次數(shù).
*/
private int maxRetries = 3;
/**
* 會(huì)話超時(shí)時(shí)間.
* 單位毫秒.
*/
private int sessionTimeoutMilliseconds;
/**
* 連接超時(shí)時(shí)間.
* 單位毫秒.
*/
private int connectionTimeoutMilliseconds;
}
InstanceInfo注冊(cè)機(jī)器
@AllArgsConstructor
@EqualsAndHashCode()
public class InstanceInfo {
private String ip;
public String getInstance() {
return ip;
}
}
ZookeeperShardingService分片服務(wù)
@Slf4j
public class ZookeeperShardingService {
public final Map<String, List<Integer>> caches = new HashMap<>(16);
private final CuratorFramework client;
private final ZookeeperConfig zkConfig;
private final ShardingStrategy shardingStrategy;
private final InstanceInfo instanceInfo;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
this.zkConfig = zkConfig;
log.info("開始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getZkAddress())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
this.shardingStrategy = shardingStrategy;
HostInfo host = new HostInfo();
this.instanceInfo = new InstanceInfo(host.getAddress());
client = builder.build();
client.getConnectionStateListenable().addListener(new ConnectionListener());
client.start();
try {
COUNT_DOWN_LATCH.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 注冊(cè)服務(wù)節(jié)點(diǎn)監(jiān)聽
registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
} catch (final Exception ex) {
ex.printStackTrace();
throw new RuntimeException(ex);
}
}
/**
* 子節(jié)點(diǎn)監(jiān)聽器
* @param nodePath 主節(jié)點(diǎn)
* @param listener 監(jiān)聽器
*/
private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
try {
// 1. 創(chuàng)建一個(gè)PathChildrenCache
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
// 2. 添加目錄監(jiān)聽器
pathChildrenCache.getListenable().addListener(listener);
// 3. 啟動(dòng)監(jiān)聽器
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
log.error("注冊(cè)子目錄監(jiān)聽器出現(xiàn)異常,nodePath:{}",nodePath,e);
throw new RuntimeException(e);
}
}
/**
* 服務(wù)啟動(dòng),注冊(cè)zk節(jié)點(diǎn)
* @throws Exception 異常
*/
private void zkOp() throws Exception {
// 是否存在ruubypay-sharding主節(jié)點(diǎn)
if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
}
// 是否存服務(wù)主節(jié)點(diǎn)
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
// 創(chuàng)建服務(wù)主節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
}
// 檢查是否存在臨時(shí)節(jié)點(diǎn)
if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance());
// 創(chuàng)建臨時(shí)節(jié)點(diǎn)
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
"/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
}
shardingFromZk();
}
/**
* 從zk獲取機(jī)器列表并進(jìn)行分片
* @throws Exception 異常
*/
private void shardingFromZk() throws Exception {
// 從 serviceName 節(jié)點(diǎn)下獲取所有Ip列表
final GetChildrenBuilder childrenBuilder = client.getChildren();
final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
List<InstanceInfo> res = new ArrayList<>();
instanceList.forEach(s -> {
res.add(new InstanceInfo(s));
});
Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
// 先清一遍緩存
caches.clear();
shardingResult.forEach((k, v) -> {
caches.put(k.getInstance().split("-")[0], v);
});
}
/**
* zk連接監(jiān)聽
*/
private class ConnectionListener implements ConnectionStateListener {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
try {
zkOp();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
COUNT_DOWN_LATCH.countDown();
}
}
}
}
/**
* 子節(jié)點(diǎn)監(jiān)聽
*/
private class ChildrenPathListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
PathChildrenCacheEvent.Type type = event.getType();
if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
try {
shardingFromZk();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
}分片算法
采用平均分配的算法
public interface ShardingStrategy {
Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
}
public class AverageAllocationShardingStrategy implements ShardingStrategy {
@Override
public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
if (list.isEmpty()) {
return null;
}
Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
addAliquant(list, shardingCount, result);
return result;
}
private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
int count = 0;
for (InstanceInfo each : instanceInfos) {
List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
shardingItems.add(i);
}
result.put(each, shardingItems);
count++;
}
return result;
}
private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
int aliquant = shardingTotalCount % instanceInfos.size();
int count = 0;
for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
if (count < aliquant) {
entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
}
count++;
}
}
}總結(jié)
基于Zookeeper和簡(jiǎn)單的平均分配算法實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的分布式分片服務(wù),該分片服務(wù)目前滿足公司需求,因?yàn)槠浜?jiǎn)單,所以不一定滿足其他場(chǎng)景,針對(duì)其他場(chǎng)景還需考慮其他因素,該示例供參考。
到此這篇關(guān)于Java Zookeeper分布式分片算法超詳細(xì)講解流程的文章就介紹到這了,更多相關(guān)Java Zookeeper分布式分片算法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot操作Mongodb的實(shí)現(xiàn)示例
本文主要介紹了SpringBoot操作Mongodb的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
Java使用Maven BOM統(tǒng)一管理版本號(hào)的實(shí)現(xiàn)
這篇文章主要介紹了Java使用Maven BOM統(tǒng)一管理版本號(hào)的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
springMvc注解之@ResponseBody和@RequestBody詳解
本篇文章主要介紹了springMvc注解之@ResponseBody和@RequestBody詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05
spring循環(huán)注入異常問(wèn)題的解決方案
今天小編就為大家分享一篇關(guān)于spring循環(huán)注入異常問(wèn)題的解決方案,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12
Spring Boot 實(shí)例代碼之通過(guò)接口安全退出
這篇文章主要介紹了Spring Boot 實(shí)例代碼之通過(guò)接口安全退出的相關(guān)資料,需要的朋友可以參考下2017-09-09
Mybatis-plus+通用mapper(tk.mybatis)的使用
本文主要介紹了Mybatis-plus+通用mapper(tk.mybatis)的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧<BR>2024-03-03
Apache?Commons?Imaging處理圖像實(shí)例深究
這篇文章主要為大家介紹了Apache?Commons?Imaging處理圖像的實(shí)例探索深究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
關(guān)于@Autowired注入依賴失敗的問(wèn)題及解決
這篇文章主要介紹了關(guān)于@Autowired注入依賴失敗的問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-08-08

