别再问我ConcurrentHashMap了
以下ConcurrentHashMap以jdk8中为例进行分析,ConcurrentHashMap是一个线程安全、基于数组+链表(或者红黑树)的kv容器,主要特性如下:
ConcurrentHashMap默认数组长度16,map最大容量为 Node结构ConcurrentHashMap中一个重要的类就是Node,该类存储键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是有一些差别是ConcurrentHashMap的value和next属性都是volatile的( 1 static class Node<K,V> implements Map.Entry<K,V> { 2 final int hash; 3 final K key; 4 volatile V val; 5 volatile Node<K,1)"> next; 6 } ConcurrentHashMap定义了三个原子操作,用于对数组指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。 1 //获得在i位置上的Node节点 2 final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, i) { 3 return (Node<K,V>)U.getObjectVolatile(tab,((long)i << ASHIFT) + ABASE); 4 } 5 利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少 6 在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改 7 因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 8 boolean casTabAt(Node<K,1)"> i, 9 Node<K,V> c,Node<K,1)"> v) { 10 return U.compareAndSwapObject(tab,1)"> ABASE,c,v); 11 12 利用volatile方法设置节点位置的值 13 void setTabAt(Node<K,1)">int i,1)">14 U.putObjectVolatile(tab,1)">long)i << ASHIFT) + ABASE,v); 下面就按照ConcurrentHashMap的?put / get / remove?来分析下其实现原理,中间涉及rehash、红黑树转换等。 put流程put操作流程如下:
注意:因为往map中添加元素和增加元素统计值是两个步骤,不是原子的,所以获取map.size()时可能不是准确值。 对key的hashCode计算hash 存到map中的key并不是直接按照hashCode计算的,因为hashCode有可能为负的,并且不合理的hashCode实现可能导致较多冲突,因此ConcurrentHashMap中会对key对hashCode进行hash操作: int hash = spread(key.hashCode()); 2 HASH_BITS = 0x7fffffff 符号位设置为0 3 int spread( h) { return (h ^ (h >>> 16)) & HASH_BITS; 5 } 红黑树节点比较 既然使用到了红黑树,这就涉及到节点的大小比较问题(节点数据包含key、value信息)。进行节点的大小比较时,首先是比较节点的hash值,注意hash值不是hashCode,因为hash值是对象hashCode与自己无符号右移16位进行异或后的值。如果节点的hash值相等,判断节点的key对象是否实现了Comparable接口,实现的话就是用Comparable逻辑比较节点之间的大小。如果key对象未实现Comparable接口,则调用tieBreakOrder方法进行判断: dir = tieBreakOrder(k,pk); k/pk,带比较两个节点,命名还是挺有意思的 tieBreakOrder(Object a,Object b) { 3 d; 4 if (a == null || b == null || 5 (d = a.getClass().getName(). 6 compareTo(b.getClass().getName())) == 0) 7 d = (System.identityHashCode(a) <= System.identityHashCode(b) ? 8 -1 : 1); 9 return10 } 这里调用了System.identityHashCode,将由默认方法hashCode()返回,如果对象的hashCode()被重写,则System.identityHashCode和hashCode()的返回值就不一样了。 put源码 1 final V putVal(K key,V value,1)">boolean onlyIfAbsent) { 2 key value非空 3 if (key == null || value == null) throw new NullPointerException(); 4 int hash = spread(key.hashCode()); 5 slot对应元素个数,链表转换成红黑树时用 6 int binCount = 0; 7 for (Node<K,V>[] tab = table;;) { 8 Node<K,V> f; n,i,fh; 9 if (tab == null || (n = tab.length) == 0 10 tab = initTable(); 11 else if ((f = tabAt(tab,i = (n - 1) & hash)) == null) { 12 if (casTabAt(tab,1)">,1)"> 13 new Node<K,V>(hash,key,value,1)">))) 14 break; no lock when adding to empty bin 15 } 16 if ((fh = f.hash) == MOVED) 17 在rehash扩容,帮助扩容,扩容完成之后才能继续进行put操作 18 tab = helpTransfer(tab,f); 19 else 20 V oldVal = 21 synchronized (f) { 加锁 22 if (tabAt(tab,i) == f) { 可能已经被更新需要再次进行判断 23 if (fh >= 0) { 节点更新或插入 24 binCount = 1 25 binCount) { 26 K ek; 27 if (e.hash == hash && 28 ((ek = e.key) == key || 29 (ek != null && key.equals(ek)))) { 30 oldVal = e.val; 31 if (!onlyIfAbsent) 32 e.val = value; 33 break 34 } 35 Node<K,V> pred = e; 36 if ((e = e.next) == 37 pred.next = (hash,1)"> 38 value,1)"> 39 40 41 } 42 } 43 if (f instanceof TreeBin) { 红黑树更新或插入 44 Node<K,1)"> p; 45 binCount = 2 46 if ((p = ((TreeBin<K,1)">)f).putTreeVal(hash,1)"> 47 value)) != 48 oldVal = p.val; 49 50 p.val = 51 52 53 } 54 } 55 if (binCount != 0 56 if (binCount >= TREEIFY_THRESHOLD) 57 treeifyBin(tab,i); 58 if (oldVal != 59 oldVal; 60 61 62 63 } 64 增加统计值,可能触发rehash扩容 65 addCount(1L 66 return 67 } 68 69 private void addCount(long x,1)"> check) { 70 CounterCell[] as; long b,s; 71 /** 72 * counterCells非空表示当前put并发较大,按照counterCells进行分线程统计 73 * 参考LongAddr思想 74 */ 75 if ((as = counterCells) != 76 !U.compareAndSwapLong(this,BASECOUNT,b = baseCount,s = b + x)) { 77 CounterCell a; long v; m; 78 boolean uncontended = true 79 if (as == null || (m = as.length - 1) < 0 || 80 (a = as[ThreadLocalRandom.getProbe() & m]) == 81 !(uncontended = 82 U.compareAndSwapLong(a,CELLVALUE,v = a.value,v + x))) { 83 fullAddCount(x,uncontended); 84 85 86 if (check <= 1 87 88 s = sumCount(); 89 90 if (check >= 0 91 Node<K,nt; 92 大于等于阈值数时进行扩容操作 93 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 94 (n = tab.length) < MAXIMUM_CAPACITY) { 95 int rs = resizeStamp(n); 96 if (sc < 0 97 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 98 sc == rs + MAX_RESIZERS || (nt = nextTable) == 99 transferIndex <= 0100 101 if (U.compareAndSwapInt()) 102 transfer(tab,nt); 103 104 this105 (rs << RESIZE_STAMP_SHIFT) + 2106 transfer(tab,1)">107 s =108 109 110 } ? get流程get方法比较简单,给定一个key来确定value的时候,必须满足两个条件hash值相同同时 key相同(equals) ,对于节点可能在链表或树上的情况,需要分别去查找。 get时一般是不加锁(Node节点中value数据类型是volatile的,保证了内存可见性)。如果slot元素为链表,直接读取返回即可;如果slot元素为红黑树,并且此时该树在进行再平衡或者节点删除操作,读取操作会按照树节点的next指针进行读取,也是不加锁的;如果该树并没有进行平衡或者节点删除操作,那么会用CAS加读锁,防止读取过程中其他线程该树进行更新操作,破坏“读视图”。 remove流程remove流程就是根据key找到对应节点,将该节点从链表(更改节点前后关系)或者红黑树移除的过程,注意,从红黑树中删除元素后,不会将红黑树转换为列表的,只能在put元素时列表可能有转换红黑树操作,不会有反向操作。 注意:hashMap有自动rehash扩容机制,但是当元素remove之后并没有自动缩容机制,如果数组经过多次扩容变得很大,并且当前元素较少,请将这些元素转移到一个新的HashMap中。 rehash流程rehash时是成倍扩容(老table和新tableNew),对于table中i位置的所有元素,扩容后会被分配到i和i+table.length这两个位置中。rehash主要的流程transfer方法中,具体不再展开。 ? 推荐阅读:
更多文档可扫描以下二维码: ? (编辑:北几岛) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |