JAVA------多线程之聊一聊同步容器&并发容器

it2026-01-21  4

同步容器

在java早期,提供了Vector和HashTable两个同步容器,分别是List和Map的具体实现。 查看其源码,实现线程同步的方法是对每个公共使用synchronize关键字,在方法上实现同步。源码如下:

//vector.add(e) public synchronized boolean add(E e) { modCount++; ensureCapacityHelper(elementCount + 1); elementData[elementCount++] = e; return true; } //hashTable.get(key) public synchronized V get(Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null; }

通过源代码可看出,这两个同步容器的实现非常简单,仅仅只是在执行方法上加上同步,那么其实这样做会产生一些问题。一是在一些符合操作的时候,并不是线程安全,例如如下代码:

public void m1(){ int index=vector.size()-1; vector.remove(index); }

在上段代码中,vector.size()和vector.add(e)都是同步方法,但是在执行m1()方法时,仍然会线程不安全,解决方法任然是在m1方法上加上同步,如下:

public synchronized void m1(){ int index=vector.size()-1; vector.remove(index); }

二是这种在公共方法加synchronized的方法,使得每个线程一个一个进入临界区,其实也就是相当于对普通的list或者map手动加synchronized或者lock,线程串行运行,降低并发性,效率低下。


并发容器

上面说同步容器存在两个问题,那么使用并发容器可以有效地提高并发性。 并发容器跟同步容器比具有以下特点:

有针对使用情境的设计,减少锁的使用。比如CopyOnWrite…,读并不加锁,写时复制。定义一些线程安全的符合操作。在迭代时,可以不加synchronized。但是会产生数据脏读。

并发容器是juc包中提供的一系列容器类,里面包括了BlockingQueue(BlockingDeque是双向队列,姑且算是一类)和ConcurrentMap两个接口及其实现类,和一些CopyOnWriteXxx与ConcurrentXxx类

BlockingQueue BlockingQueue主要有ArrayBlockingQueue,LinkedBlocking,SynchronousQueue三种实现 以ArrayBlockingQueue为例,看下具体使用和实现 使用示例:

/** * 阻塞队列 * 示例:生产消费问题 */ public class T08_BlockingQueue { public static void main(String[] args) { BlockingQueue<Object> queue=new ArrayBlockingQueue<>(10); Produce p=new Produce(queue); Custom c=new Custom(queue); new Thread(p).start(); new Thread(c).start(); } } class Produce implements Runnable{ BlockingQueue<Object> queue; Produce(BlockingQueue<Object> queue){ this.queue=queue; } @Override public void run() { while (true){ try { this.queue.put(product()); } catch (InterruptedException e) { e.printStackTrace(); } } } Object product(){ return new Object(); } } class Custom implements Runnable{ BlockingQueue<Object> queue; Custom(BlockingQueue<Object> queue){ this.queue=queue; } @Override public void run() { while (true){ try { this.queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }

上述代码实现了生产者和消费者模式,使用了put(e)和take()方法。查看源码,put和take方法源码如下:

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

put和take执行逻辑如下:

加锁判断当前队列是否已空(已满),若是,则当前线程等待若不是,添加(移除)元素,唤醒另一个condition线程释放锁

观察BlockingQueue接口,不单单有put和take方法,还有add(e),offer(e),remove(),poll()等功功能相似的方法,其具体使用区别如下:

抛出异常返回特殊值阻塞超时插入add(e)offer(e)put(e)offer(e,time,unit)移除remove()poll()take()poll(time,unit)获取首个元素element()peek()––

BlockingQueue允许多个线程对其进行读写操作,与同步容器比,极大提高并发量。


CopyOnWriteXxx 在juc下。CopyOnWrite写时复制容器是一种常用的并发容器,它通过多线程下读写分离来达到提高并发性能的目的。也就是说在读的时候任何时候,任何线程都可以,无需加锁,但是写的时候需要加锁,且构造一个容器的Copy,在这个备份中进行操作,完成后将容器替换为新容器即可。 CopyOnWriteXxx类有两个,CopyOnWriteArrayList和CopyOnWriteListSet,以CopyOnWriteArrayList为例进行分析。 add(e)和get(index)源码如下:

public boolean add(E e) { final ReentrantLock lock = this.lock; //获取独占锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //复制一份copy Object[] newElements = Arrays.copyOf(elements, len + 1); //插入数据 newElements[len] = e; //更新为容器 setArray(newElements); return true; } finally { //释放锁 lock.unlock(); } } public E get(int index) { //直接读取index位置上的数据 return get(getArray(), index); }

从源码可以看出,读取时(get)并没有对方法进行加锁,也就是说,任意线程都能够并发的访问数组数据,而在修改时(add),却要获得独占锁,也就是说同一时间只能有一个线程可以修改,而copy一份数组作为当前操作的对象,避免和读线程发生冲突。 从以上分析推出,CopyOnWrite要比同步容器的并发效率高很多,然而即便如此,仍然有其缺点和局限性:

调用add时,需要开辟一段空间保存原数组备份,修改完后将指针指向当前备份数组,原数组会被GC。所以当高并发修改时,则会造成频繁的开辟空间和频繁GC,对性能有一点影响,这种情况CopyOnWrite并不适用;由于修改时是用的原数组的备份,则若此时有线程正在读取数据时,新的数组还没有更新,此时新修改的数据就不会被读取,因此,在对数据有较强一致性要求的情况下也不适用。

ConcurrentMap 在juc中,ConcurrentMap是提供线程安全性和原子性保证的Map。 ConcurrentMap的实现类有两个,ConcurrentHashMap和ConcurrentSkipListMap,以ConcurrentHashMap为例进行分析,ConcurrentHashMap可以理解为HashMap的并发版本,它的底层实现仍然使用的是与HashMap一般的数组+链表+红黑树的实现数据结构,只是在操作上增加了并发控制。 与CopyOnWrite的锁机制不同,ConcurrentMap的锁的颗粒度更小,使用的是一种叫分段锁的东西。这种锁机制能够使得任意读取线程都能并发访问容器,同时且可以允许一定量的写线程并发修改容器。 所谓的分段锁,就是对容器内的数据分成若干段,对每一段数据分别进行加锁。毕竟多个线程修改的数据很有可能不在一块,当修改一块数据的时候却把所有数据锁住是不合适的。所以使用分段锁时,当多个线程分别修改不同段的数据时并不会造成冲突; 下面以put()方法源码进行分析:

final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //计算桶的hash值,确定桶的位置 int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若当前桶还么有元素,则cas式添加元素 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //扩容时,帮助扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //hash冲突时锁住当前需要添加节点的头元素 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }

put的执行逻辑如下:

计算当前key的hash值,确定桶的位置若此时桶为空,则使用CAS操作插入新节点若此时正在扩容,则协助扩容。在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,其他桶节点都不需要加锁,大大减小了锁粒度。添加节点,并调整至合适的结构(当前若是链表且长度过长时转为红黑树) ConcurrentHashMap使用的是CAS+synchronized实现并发控制,分段锁是针对每个桶加锁,也就是说访问不同桶中的数据多个线程可以并发访问。

总结

同步容器和并发容器都是java提供的在多线程环境下使用线程安全的容器,通过各种并发控制机制,都有其使用领域和局限性,使用的时候还是需要考虑考虑。

最新回复(0)