加入收藏 | 设为首页 | 会员中心 | 我要投稿 北几岛 (https://www.beijidao.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

别再问我ConcurrentHashMap了

发布时间:2021-05-21 06:56:51 所属栏目:大数据 来源: https://www.jb51.cc
导读:以下ConcurrentHashMap以jdk8中为例进行分析,ConcurrentHashMap是一个线程安全、基于数组+链表(或者红黑树)的kv容器,主要特性如下: 线程安全,数组中单个slot元素个数超过8个时会将链表结构转换成红黑树,注意树节点之间还是有next指针的; 当元素个数超

以下ConcurrentHashMap以jdk8中为例进行分析,ConcurrentHashMap是一个线程安全、基于数组+链表(或者红黑树)的kv容器,主要特性如下:

  • 线程安全,数组中单个slot元素个数超过8个时会将链表结构转换成红黑树,注意树节点之间还是有next指针的;
  • 当元素个数超过N(N = tab.length - tab.length>>>2,达到0.75阈值时)个时触发rehash,成倍扩容;
  • 当线程扩容时,其他线程put数据时会加入帮助扩容,加快扩容速度;
  • put时对单个slot头节点元素进行synchronized加锁,ConcurrentHashMap中的加锁粒度是针对slot节点的,rehash过程中加锁粒度也是如此;
  • get时一般是不加锁。如果slot元素为链表,直接读取返回即可;如果slot元素为红黑树,并且此时该树在进行再平衡或者节点删除操作,读取操作会按照树节点的next指针进行读取,也是不加锁的(因为红黑树中节点也是有链表串起来的);如果该树并没有进行平衡或者节点删除操作,那么会用CAS加读锁,防止读取过程中其他线程该树进行更新操作(主要是防止破坏红黑树节点之间的链表特性),破坏“读视图”。

ConcurrentHashMap默认数组长度16,map最大容量为MAXIMUM_CAPACITY = 1 << 30。创建ConcurrentHashMap并不是涉及数组的初始化,数组初始化是在第一次put数据才进行的。(注意:JDK1.8中舍弃了之前的分段锁技术,改用CAS+Synchronized机制)

Node结构

ConcurrentHashMap中一个重要的类就是Node,该类存储键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是有一些差别是ConcurrentHashMap的value和next属性都是volatile的(保证了get数据时直接返回即可,volatile保证了更新的可见性),且不允许调用setValue方法直接改变Node的value域,增加了find方法辅助map.get()方法,可在get方法返回的结果中更改对应的value值。

1 static class Node<K,V> implements Map.Entry<K,V> {
2     final int hash;
3     final K key;
4     volatile V val;
5     volatile Node<K,1)"> next;
6 }

ConcurrentHashMap定义了三个原子操作,用于对数组指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

 1 //获得在i位置上的Node节点  
 2 final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, i) {  
 3    return (Node<K,V>)U.getObjectVolatile(tab,((long)i << ASHIFT) + ABASE);  
 4 }  
 5 利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少  
 6 在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改  
 7 因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果
 8 boolean casTabAt(Node<K,1)"> i, 9                                    Node<K,V> c,Node<K,1)"> v) {  
10    return U.compareAndSwapObject(tab,1)"> ABASE,c,v);  
11 12 利用volatile方法设置节点位置的值  
13 void setTabAt(Node<K,1)">int i,1)">14    U.putObjectVolatile(tab,1)">long)i << ASHIFT) + ABASE,v);  

下面就按照ConcurrentHashMap的?put / get / remove?来分析下其实现原理,中间涉及rehash、红黑树转换等。

put流程

put操作流程如下:

  • 首先根据key的hashCode计算hash,然后根据hash计算应该在数组中存储位置,如果数据为null,新建数组;
  • 然后通过tabAt(&操作)直接获取对应slot。如果slot为null,则新建kv节点(Node类型)放到slot;
  • 如果当前slot节点的hash值等于MOVED(等于-1),表示其类型为ForwardingNode,证明其他线程在进行rehash扩容操作,当前线程也会帮助一起进行扩容操作;
  • 然后对slot节点进行synchronized加锁,如果slot节点hash值大于等于0,表示当前slot对应元素为链表结构,遍历当前链表,如果key存在则更新,否则添加到链表尾部;如果slot节点类型为TreeBin(其hash值为-2),表示slot对应元素为红黑树,则在红黑树中进行更新节点或者添加节点操作,注意,最后如果树不平衡会进行树的再平衡操作,此时对树root节点加CAS写锁。
  • 最后,如果新添加了节点,会统计map size值;如果当前map数量超过了阈值(N = tab.length - tab.length>>>2)会触发rehash扩容,按照成倍扩容。

注意:因为往map中添加元素和增加元素统计值是两个步骤,不是原子的,所以获取map.size()时可能不是准确值。

对key的hashCode计算hash

存到map中的key并不是直接按照hashCode计算的,因为hashCode有可能为负的,并且不合理的hashCode实现可能导致较多冲突,因此ConcurrentHashMap中会对key对hashCode进行hash操作:

 int hash = spread(key.hashCode());
2  HASH_BITS = 0x7fffffff 符号位设置为0
3 int spread( h) {
return (h ^ (h >>> 16)) & HASH_BITS;
5 }

红黑树节点比较

既然使用到了红黑树,这就涉及到节点的大小比较问题(节点数据包含key、value信息)。进行节点的大小比较时,首先是比较节点的hash值,注意hash值不是hashCode,因为hash值是对象hashCode与自己无符号右移16位进行异或后的值。如果节点的hash值相等,判断节点的key对象是否实现了Comparable接口,实现的话就是用Comparable逻辑比较节点之间的大小。如果key对象未实现Comparable接口,则调用tieBreakOrder方法进行判断:

 dir = tieBreakOrder(k,pk); k/pk,带比较两个节点,命名还是挺有意思的
 tieBreakOrder(Object a,Object b) {
 3      d;
 4     if (a == null || b == null ||
 5         (d = a.getClass().getName().
 6          compareTo(b.getClass().getName())) == 0)
 7         d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
 8              -1 : 1);
 9     return10 }

这里调用了System.identityHashCode,将由默认方法hashCode()返回,如果对象的hashCode()被重写,则System.identityHashCode和hashCode()的返回值就不一样了。

put源码

  1 final V putVal(K key,V value,1)">boolean onlyIfAbsent) {
  2      key value非空
  3     if (key == null || value == null) throw new NullPointerException();
  4     int hash = spread(key.hashCode());
  5      slot对应元素个数,链表转换成红黑树时用
  6     int binCount = 0;
  7     for (Node<K,V>[] tab = table;;) {
  8         Node<K,V> f;  n,i,fh;
  9         if (tab == null || (n = tab.length) == 0 10             tab = initTable();
 11         else if ((f = tabAt(tab,i = (n - 1) & hash)) == null) {
 12             if (casTabAt(tab,1)">,1)"> 13                          new Node<K,V>(hash,key,value,1)">)))
 14                 break;                    no lock when adding to empty bin
 15         }
 16         if ((fh = f.hash) == MOVED)
 17              在rehash扩容,帮助扩容,扩容完成之后才能继续进行put操作
 18             tab = helpTransfer(tab,f);
 19         else 20             V oldVal =  21             synchronized (f) {  加锁
 22                 if (tabAt(tab,i) == f) {  可能已经被更新需要再次进行判断
 23                     if (fh >= 0) {  节点更新或插入
 24                         binCount = 1 25                         binCount) {
 26                             K ek;
 27                             if (e.hash == hash &&
 28                                 ((ek = e.key) == key ||
 29                                  (ek != null && key.equals(ek)))) {
 30                                 oldVal = e.val;
 31                                 if (!onlyIfAbsent)
 32                                     e.val = value;
 33                                 break 34                             }
 35                             Node<K,V> pred = e;
 36                             if ((e = e.next) ==  37                                 pred.next = (hash,1)"> 38                                                           value,1)"> 39                                  40  41                         }
 42                     }
 43                     if (f instanceof TreeBin) {  红黑树更新或插入
 44                         Node<K,1)"> p;
 45                         binCount = 2 46                         if ((p = ((TreeBin<K,1)">)f).putTreeVal(hash,1)"> 47                                                        value)) !=  48                             oldVal = p.val;
 49                              50                                 p.val = 51  52  53                 }
 54             }
 55             if (binCount != 0 56                 if (binCount >= TREEIFY_THRESHOLD)
 57                     treeifyBin(tab,i);
 58                 if (oldVal !=  59                      oldVal;
 60                  61  62  63     }
 64      增加统计值,可能触发rehash扩容
 65     addCount(1L 66     return  67 }
 68 
 69 private void addCount(long x,1)"> check) {
 70     CounterCell[] as; long b,s;
 71     /**
 72      * counterCells非空表示当前put并发较大,按照counterCells进行分线程统计
 73      * 参考LongAddr思想
 74      */
 75     if ((as = counterCells) !=  76         !U.compareAndSwapLong(this,BASECOUNT,b = baseCount,s = b + x)) {
 77         CounterCell a; long v;  m;
 78         boolean uncontended = true 79         if (as == null || (m = as.length - 1) < 0 ||
 80             (a = as[ThreadLocalRandom.getProbe() & m]) ==  81             !(uncontended =
 82               U.compareAndSwapLong(a,CELLVALUE,v = a.value,v + x))) {
 83             fullAddCount(x,uncontended);
 84              85  86         if (check <= 1 87              88         s = sumCount();
 89  90     if (check >= 0 91         Node<K,nt;  92          大于等于阈值数时进行扩容操作
 93         while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
 94                (n = tab.length) < MAXIMUM_CAPACITY) {
 95             int rs = resizeStamp(n);
 96             if (sc < 0 97                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
 98                     sc == rs + MAX_RESIZERS || (nt = nextTable) ==  99                     transferIndex <= 0100                     101                 if (U.compareAndSwapInt())
102                     transfer(tab,nt);
103 104             this105                                          (rs << RESIZE_STAMP_SHIFT) + 2106                 transfer(tab,1)">107             s =108 109 110 }

?

get流程

get方法比较简单,给定一个key来确定value的时候,必须满足两个条件hash值相同同时 key相同(equals) ,对于节点可能在链表或树上的情况,需要分别去查找。

get时一般是不加锁(Node节点中value数据类型是volatile的,保证了内存可见性)。如果slot元素为链表,直接读取返回即可;如果slot元素为红黑树,并且此时该树在进行再平衡或者节点删除操作,读取操作会按照树节点的next指针进行读取,也是不加锁的;如果该树并没有进行平衡或者节点删除操作,那么会用CAS加读锁,防止读取过程中其他线程该树进行更新操作,破坏“读视图”。

remove流程

remove流程就是根据key找到对应节点,将该节点从链表(更改节点前后关系)或者红黑树移除的过程,注意,从红黑树中删除元素后,不会将红黑树转换为列表的,只能在put元素时列表可能有转换红黑树操作,不会有反向操作。

注意:hashMap有自动rehash扩容机制,但是当元素remove之后并没有自动缩容机制,如果数组经过多次扩容变得很大,并且当前元素较少,请将这些元素转移到一个新的HashMap中。

rehash流程

rehash时是成倍扩容(老table和新tableNew),对于table中i位置的所有元素,扩容后会被分配到i和i+table.length这两个位置中。rehash主要的流程transfer方法中,具体不再展开。

?

推荐阅读:

  • ConcurrentHashMap竟然也有死循环问题?

  • 你的ThreadLocal线程安全么

更多文档可扫描以下二维码:

?

(编辑:北几岛)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读