从零实现一个一致性哈希算法,理解+实现。

概念

wiki:

In computer science, consistent hashing is a special kind of hashing technique such that when a hash table is resized, only n/m keys need to be remapped on average where n is the number of keys and m is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped because the mapping between the keys and the slots is defined by a modular operation.

一致性哈希,也叫哈希环,是一种对传统哈希表rehash过程的改进算法。通过创建一个环结构,加入虚拟节点,改变寻址逻辑:根据key产生的hashcode在环上寻找下一个虚拟节点,以此提升增删真实节点之后的性能。

场景预设

分布式缓存,我们的集群中有多个节点,数据根据key分区到不同节点。

传统hash路由

假定我们的hash结构现在存储的是 数据key->访问节点 ,那么通过数据key查询时就可以定位到实际存储的节点,也就起到了数据分区、负载均衡的作用。

首先回顾下传统hash函数,也叫散列函数,目的是将指定的入参转换成一个固定的key,如hashCodeMD5Murmurhash

而传统hashtablehashmap的实现下,一旦节点失效,则需要挪动所有的key(因为我们存的是 key->节点),rehash分布,开销巨大。

普通hash:一旦节点发生变动,所有数据均参与到rehash搬运中,开销涉及到所有节点对应数据量

我先实现了一个基于哈希表的coordinator,协调者是分布式常见组件。

参见:HashTableCoordinator

一致性哈希路由

而一致性hash,将增删节点后的开销降低到了最低。

一致性hash:某个节点发生变动,仅有该节点数据分摊到其他节点中,开销涉及到单个节点对应数据量

设计思路

  1. 使用环状结构存储节点;
    1. 对应到代码中则是一个有序集合,我们可以考虑红黑树(足够平衡)、跳表,方便快速找到对应值的后续值,同时插入、删除数据复杂度也很低;
  2. 环上,节点之间增加若干虚拟节点,供映射 虚拟节点->真实节点 的关系;
    1. 增加虚拟节点的原因是为了均衡环上数据的分布,因为数据要找节点,而只有真实节点的话,增删一个节点必然导致节点间距不同、分不均;
    2. 而这里我们加入了足够多的虚拟节点,每个虚拟节点按顺序关联到真实节点上,虚拟节点在环上分布足够均匀;
    3. 新增节点:创建一个真实节点,同时根据hash均匀创建环上的若干虚拟节点;
    4. 下线节点:去掉node,也去掉关联的一批虚拟节点,数据key下次查找时根据环上hashcode顺次查找到另一个虚拟节点上,触发对应据的更新,由于虚拟节点足够多,数据更新被延迟加载,同时分布够均匀;
  3. 根据key查找节点的过程:
    1. 根据key生成hashcode,在环上根据hashcode顺序查找,找到对应虚拟节点,就找到了真实节点;

图例

查找

假设此时来了一个key4查找对应数据,那么先经过我们的murmurhash计算出一个long值,对应到环上的一个位置,也就是跳表或者红黑树上的一个点,这个点之后一定有我们预先插入并且足够散列的一个虚拟节点,通过虚拟节点就可以找到真实节点,进而获取数据。

如图:

增加新的节点

假设我们新增了节点4,在数据结构中的体现:

  1. 新增节点4存储;
  2. 新增虚拟节点到节点4的映射;

同时我们假设一个节点对应10个虚拟节点节点:

可以看到我们的虚拟节点根据hash尽可能均匀的分布在环的各个位置,在我们的数据key足够多的情况下,会有近似 keyCnt/virtualNodeCntkey被归属到了新的虚拟节点,新的真实节点会接管这些key数据。

keyCnt 总数据量 virtualNodeCnt 虚拟节点总量

相比传统哈希表的全量rehash开销控制到了最小。

删除老的节点

集群碰到老的节点下线是很常见的。

在数据结构中的体现:

  1. 移除节点4存储;
  2. 移除虚拟节点到节点4的映射;

此时我们的环图退回到了之前的状态,节点4管理的key根据环依旧可以找到其他节点归属,并且节点4的数据被近似平分到了其他节点上,如图:

实现

单纯看理论总觉得空洞,我们来动手实现一下一致性哈希算法的基本过程。

环结构选型

环的结构要求:

  1. 顺时针可以找到某个值的下一个值,也就是说数据结构需要有序,并且可以很快找到后续值;
  2. 添加新数据需要足够高效;
  3. 移除老数据也需要高效;

JDK中有序的集合大概有:

  • 优先级队列 PriorityQueue 但是只能比较方便的获取最值
  • 红黑树 TreeMap
  • 跳表 ConcurrentSkipListMap

TreeMapConcurrentSkipListMap 都可以比较好的满足我们的要求。二者均实现了 SortedMap 接口,可以方便的获取顺序值的前后元素。

本文我们就用 ConcurrentSkipListMap了。

提供的功能

  • 初始化环:节点->虚拟节点映射;
  • 新增节点;
  • 移除节点;
  • 根据数据key获取数据;

代码

参见:ConsistentHashCoordinator

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class ConsistentHashCoordinator {

    /**
     * 虚拟节点hash:真实节点 映射
     * 环的结构使用 SortedMap,实现可选 TreeMap 基于红黑树 或者 ConcurrentSkipListMap 基于跳表
     */
    private final SortedMap<Long, Node> virtualHash2Node;
    /**
     * 一个真实节点对应的虚拟节点数量
     */
    private final int virtualCntPerNode;
    /**
     * 节点读取量
     */
    Map<Node, Integer> node2ReadCnt;

    /**
     * 初始化一致性hash环: 虚拟hash映射到真实节点
     *
     * @param nodeList          节点列表
     * @param virtualCntPerNode 一个真实节点对应多少个虚拟节点
     */
    public ConsistentHashCoordinator(List<Node> nodeList, int virtualCntPerNode) {
        this.virtualHash2Node = new ConcurrentSkipListMap<>();
        this.virtualCntPerNode = virtualCntPerNode;
        for (Node node : nodeList) {
            addNode(node);
        }
        node2ReadCnt = new LinkedHashMap<>();
    }

    /**
     * 存入kv数据,根据key获取到hash值,在环上找到下一个虚拟节点,根据环映射获取真实节点
     *
     * @param key  K
     * @param data V
     */
    public void put(String key, Object data) {
        Node node = getNodeByKey(key);
        // 节点数据存储
        node.put(key, data);
    }

    @NotNull
    private Node getNodeByKey(String key) {
        Long hash = MurmurHashUtils.hash(key);
        SortedMap<Long, Node> tailMap = virtualHash2Node.tailMap(hash);
        // hash值后面如果有值就从后面取,没有就从头取,环状遍历
        Long virtualHash = tailMap.isEmpty() ? virtualHash2Node.firstKey() : tailMap.firstKey();
        Node node = virtualHash2Node.get(virtualHash);
        if (node == null) {
            throw new IllegalStateException("节点获取失败,数据key:" + key);
        }
        return node;
    }

    public Object getValueByKey(String key) {
        Node node = getNodeByKey(key);
        node2ReadCnt.put(node, node2ReadCnt.getOrDefault(node, 0) + 1);
        if (node.containsKey(key)) {
            Object data = node.get(key);
//            System.out.println("从node:" + node + " 中查数据,key::" + key + " value::" + data);
            return data;
        }
//        System.out.println("从node:" + node + " 中重载数据,key::" + key);
        node.reloadByKey(key);
        return node.get(key);
    }

    /**
     * 集群新增节点
     *
     * @param newNode 新节点实例
     */
    public void addNode(Node newNode) {
        for (int i = 0; i < virtualCntPerNode; i++) {
            Long hash = MurmurHashUtils.hash("virtualNodeKey" + newNode.ip + newNode.port + i);
            virtualHash2Node.put(hash, newNode);
        }
    }

    /**
     * 集群下线节点
     * 删除虚拟节点:真实节点映射后,环上依然存在其他节点以及虚拟的映射,根据key依然可以定位某个节点,如果数据不存在,执行reload
     *
     * @param oldNode 已有节点实例
     */
    public void removeNode(Node oldNode) {
        Iterator<Map.Entry<Long, Node>> iterator = virtualHash2Node.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Long, Node> entry = iterator.next();
            Long virtualHash = entry.getKey();
            Node node = virtualHash2Node.get(virtualHash);
            if (node != null && node.equals(oldNode)) {
                iterator.remove();
            }
        }
    }

    public void clearRead() {
        node2ReadCnt.clear();
    }

    public void print() {
        Map<Node, Integer> node2VirtualCnt = new LinkedHashMap<>();
        for (Map.Entry<Long, Node> entry : virtualHash2Node.entrySet()) {
            Node node = entry.getValue();
            node2VirtualCnt.put(node, node2VirtualCnt.getOrDefault(node, 0) + 1);
        }
        System.out.println("==============");
        System.out.println(node2VirtualCnt);
        System.out.println("==============");
        System.out.println("==============");
        System.out.println(node2ReadCnt);
        System.out.println("==============");
    }

}

UT测试

  • 测试基本流程是否有误;
  • 测试节点分布是否均匀;

参见:ConsistentHashCoordinatorTest

根据用例的输出,可以方便的看到读取数据的分布情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
==============
==============
{Node{ip='ip2', port=8089}=1000, Node{ip='ip3', port=8068}=1000, Node{ip='ip5', port=8058}=1000, Node{ip='ip4', port=8078}=1000, Node{ip='ip1', port=8088}=1000, Node{ip='ip6', port=8038}=1000}
==============
==============
{Node{ip='ip6', port=8038}=177055, Node{ip='ip1', port=8088}=163883, Node{ip='ip4', port=8078}=164781, Node{ip='ip3', port=8068}=158297, Node{ip='ip5', port=8058}=171041, Node{ip='ip2', port=8089}=164943}
==============
==============
==============
==============
{Node{ip='ip2', port=8089}=1000, Node{ip='ip3', port=8068}=1000, Node{ip='ip5', port=8058}=1000, Node{ip='ip6', port=8038}=1000}
==============
==============
{Node{ip='ip6', port=8038}=259754, Node{ip='ip2', port=8089}=233252, Node{ip='ip3', port=8068}=247943, Node{ip='ip5', port=8058}=259051}
==============
==============
==============
==============
{Node{ip='ip8', port=9999}=1000, Node{ip='ip2', port=8089}=1000, Node{ip='ip3', port=8068}=1000, Node{ip='ip5', port=8058}=1000, Node{ip='ip6', port=8038}=1000}
==============
==============
{Node{ip='ip6', port=8038}=206454, Node{ip='ip2', port=8089}=182593, Node{ip='ip3', port=8068}=198483, Node{ip='ip5', port=8058}=203033, Node{ip='ip8', port=9999}=209437}
==============
==============

达到了期望。

总结

以上,理论分析了一致性哈希的基本过程、关键点,并实现了最基础的一个一致性哈希结构,输出了用例效果。

Ref