我们之前就学习了Collection下面的List和Set两大类,Queue是后来(1.5)加入的接口,它专门就是为高并发准备的
Hashtable之前说过,与Vector一样,是Map的一个古老的实现类,它的设计有不太合理的地方,比如:每一个方法都加了Synchronized。因为大多数的程序都是单个线程在跑;之后java的开发人员意识到了这一点,于是添加了一个HashMap,HashMap不只是把Synchronized去掉,它还把其他的源码作出了调整,但是呢HashMap又是完全没有加锁,之后它又加了一个Collections工具类,我们可以通过Collcetions.synchronizedMap 把HashMap变成加锁的版本。但是这个工具类也是使用的synchronized关键字,没有使用JUC包下的新锁,所以性能上相对与Hashtable没有什么提升。
在JUC包出来之后,又添加了一个效率更高一些的ConcurrentHashMap(CHM)。所以以后在多线程中要用到Map,几乎都用ConcurrentHashMap
ConcurrentHashMap提高效率主要在读上
与Hashtable的演化一样,Vector—>List—>Queue
所有用于并发的Map都继承了ConcurrentMap这个接口。
这个接口还有一个实现类ConcurrentSkipListMap,这个类是用来代替TreeMap的,因为ConcurrentMap中保证同步使用的是CAS操作,但是这个操作在TreeMap中的实现非常复杂,但是又有一开始就要排序的需求,所以java提供了ConcurrentSkipListMap。ConcurrentSkipListMap这个结构就很好的实现了Tree的功能,而且CAS在跳表上的实现的难度比TreeMap容易很多,所以JUC没有提供ConcurrentTreeMap而是提供了ConcurrentSkipListMap
跳表是一个数据结构, 是基于链表实现的一种类似“二分”的算法。它可以快速的实现增,删,改,查操作。
如图所示,当我们每隔一个节点就提取出来一个元素到上一层,把这一层称作索引,其中的down指针指向原始链表。 当我们查找元素16的时候,单链表需要比较10次,而加过索引的两级链表只需要比较7次。当数据量增大到一定程度的时候,效率将会有显著的提升。 如果我们再加多几级索引的话,效率将会进一步提升。这种链表加多级索引的结构,就叫做跳表。
它是一个线程安全的ArrayList变体,其中所有变动操作(add、set 等)都是通过生成底层数组的新副本来实现的。每次改变都是先复制一个副本,之后在副本上改变,然后在将引用指向副本,这样的话就可以实现在读取的时候不用上锁,而变动操作都是有锁的。
因为它每次改变都要复制,所以它的开销通常是很大的,但是在读操作远大与改变操作的情况下,它的性能会比较高,因为它的读操作不用上锁。
BlockingQueue是一个接口,后面各种的阻塞队列都是实现这个接口。阻塞队列提供了一系列方法,在这些方法的基础之上我们可以实现线程的自动阻塞
Queue:
boolean add(E e); 这个是Collection接口带下来的。
boolean offer(E e); 这个是Queue接口下定义的。这才是我们阻塞队列中常用的,它与add的功能是一样的,区别在与当这个队列加满时add会抛异常,而offer不会
E poll(); 取出队列的头部元素,但不会remove掉。若为空这返回null
E peek(); 与poll一样,唯一的区别就是: 会remove
E remove(); 获取队列的头部元素,并且remove。 与peek的唯一区别就是为空是会抛异常
E element(); 与poll一样,唯一区别就是若为空是会抛异常
BlockingQueue: 在Queue的基础上又添加了几个方法,下面两个方法是实现阻塞的关键
put: 也是添加,当这个队列满时,当前线程被阻塞
take:往外取,当这个队列为空时,当前取的线程被阻塞
这两个方法底层的阻塞原理就是调用了LockSupport.park();
主要就在这里,Queue添加了offer(), peek(),poll()等对多线程友好的API,Queue是后来(1.5)加入的接口,它专门就是为高并发准备的。包括它下面的子接口BlockingQueue,Deque等都是为并发服务的。
An unbounded BlockingQueue of Delayed elements, in which an element can only be taken when its delay has expired. The head of the queue is that Delayed element whose delay expired furthest in the past.队列头元素是过期最久的元素。
DelayQueue可以按照时间的优先级来对任务的一个排列
放进DelayQueue中的对象必须实现Delayed接口:要实现CompareTo等方法
DelayQueue底层是使用的PriorityQueue,锁是用的ReentrantLock
PriorityQueue使用的树结构
//DelayQueue底层是使用的PriorityQueue,锁是用的ReentrantLock private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //实际上是向PriorityQueue里面offer的 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } }A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. The head of the queue is the element that the first queued inserting thread is trying to add to the queue; if there is no such queued thread then no element is available for removal and poll() will return null. For purposes of other Collection methods (for example contains), a SynchronousQueue acts as an empty collection. This queue does not permit null elements.
这个SynchronousQueue的容量为0,它不是用来装任务的,它是用来一个线程给另外一个线程下达任务的。
public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println("只要我不取,主线程会一直被阻塞"); Thread.sleep(3000); System.out.println("现在我取出来,结果是:" + queue.take()); queue.put("收到!"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); queue.put("收到请回答"); //阻塞等待消费者消费 //queue.put("bbb"); //queue.add("aaa"); //不可以add 因为它容量是0 System.out.println("主线程恢复"); System.out.println(queue.take()); }从这个例子中我们就可以看出,SynchronousQueue特别像Exchanger,它比Exchanger更加的方便,Exchanger需要两个线程都是阻塞状态才能完成交换。
因为SynchronousQueue容量为0,所以它即是满的状态,也是空的状态,所以put一上来就会被阻塞,take一上来也会被阻塞
SynchronousQueue貌似看上去没用,但是它是在整个线程池中用处最大的一个queue,很多线程取任务的时候,互相之间进行任务调度的时候都是用的这个SynchronousQueue
TransferQueue是在1.7发布的,它在SynchronousQueue的基础上添加了一个方法transfer,这个方法也是向另外一个线程传递数据,但是与之前的put不同的是,调用transfer()的线程会阻塞住,直到又线程将其取走。
public class TransferQueueTest { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.transfer("aaa"); //strs.put("aaa"); /*new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ } }