猜猜下面的代码会输出啥?

void run(Callable<Object> c){
    try{
        System.out.println(c.call());
    }catch (Exception ex){
        System.out.println(ex);
    }
}
void testSynchronousQueue(){
    Queue<Integer> q1 = new SynchronousQueue();
    run(()-> q1.add(1));

    Queue<Integer> q2 = new SynchronousQueue();
    run(()-> q1.offer(1));
}
复制代码

实在是让人非常失望,两次执行都失败了。

java.lang.IllegalStateException: Queue full
false
复制代码

第一次,使用add方法,程序抛出了异常,表示队列满了;第二次,程序返回了false,证明添加失败。既然无法向队列中添加元素,又没有指定队列大小的地方。那这个队列,有什么鸟用!

2. Queue的方法

在了解这个队列的使用之前,我们来看一下Queue接口所定义的方法。

  • add(E e) 插入一个元素到队列的尾部。如果无法插入,则抛出异常
  • offer(E e) 插入一个元素到队列的为
  • E remove() 从队列头移除一个元素,如果队列为空,则抛出异常
  • E poll() 从队列头移除一个元素,如果队列为空,则返回null
  • E element() 查看对头元素,如果队列为空,则抛出异常
  • E peek() 查看对头元素,如果队列为空,则返回null

可以看到,对队列的基本操作,只有三个:插入新元素、查看队头、队头出对。根据是否抛出异常,又分为了两类。3x2=6,共6个方法。

喜欢刷题的同学,常用的肯定是offer、poll、peek,这样可以免去恼人的异常处理。平常的编码,也推荐使用非异常的api,但Java为什么提供了两套方法,来供我们使用呢?

原因就是,Queue接口继承了Collection接口,而add和remove等方法,是属于Collection接口的,Queue不得不实现一套。事实上,add方法直接调用了offer方法,为什么多出这么一套api来,真的是个谜。

public boolean add(E e) {
if (offer(e))
   return true;
else
   throw new IllegalStateException("Queue full");
}
复制代码

不抛异常,就容易被遗忘处理,确实是个比较牵强的原因。就凭这,能让人在这么重要的基础类库里面,创造出这么多不同名称的方法么?

3. Put和Take

相比较上面让人纠结的add和offer,put和take方法就确实有用了。但put和take是不属于Queue接口的,它的归属是BlockingQueue。不好意思,一不小心就跳到concurrent包了。

put和take,意味着阻塞。如果操作不成功,它就一直在那里阻塞。想要它们能够正常运行下去,就需要有多个线程的配合。下面的代码会往队列里发送一个1,然后take方法拿出它,进行打印。

void testBlockingSynchronousQueue() throws InterruptedException {
    BlockingQueue<Integer> q1 = new SynchronousQueue();
    new Thread(()-> {
        try {
            q1.put(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(()-> {
        try {
            System.out.println(q1.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}
复制代码

所以,我们来看一下这些这对方法。

  • put(E e) 插入元素,如果队列满了,它会一直阻塞等待
  • E take() 获取队头元素,如果队列为空则一直等待

可以看到put和take配合起来,很容易实现一个线程安全的生产者消费者模型。相比较使用Queue的接口方法,我们只能通过死循环去检测,相比较阻塞的方式就特别节省资源。

但是还没完。阻塞的take和put方法,只能被interrupt,如何让程序阻塞等待一段时间,然后恢复运行呢?那就只有加入一个带时间戳的阻塞方法。

BlockingQueue选择了offer和poll方法,而不是take和put,暂也搞不懂到底是为什么。

  • E poll(long timeout, TimeUnit unit)
  • boolean offer(E e, long timeout, TimeUnit unit) 依然是有返回值的

4. 你以为这样就完了?

你以为这样就完了?并没有。我们需要把目光投向LinkedList,传说中几行代码实现LRU缓存的类。

ArrayList是一个比较纯净的List,仅仅实现了List接口,但LinkedList就胃口大了一些。由于API设计者,尽最大可能想让这个链表功能更强大一些,它继承了Deque接口。由于Deque继承了Queue,所以这个链表不仅仅是个队列,还是个双向队列。

所以,它们又多了一堆API,分别来描述到底是在队头还是队尾进行操作。

  • addFirst 操作队头,加入元素
  • addLast 操作队尾,加入元素
  • offerFirst 操作队头,加入元素
  • offerLast 操作队尾,加入元素
  • removeFirst 操作队头,删除元素
  • removeLast 操作队尾,删除元素
  • pollFirst 操作队头,删除元素
  • pollLast 操作队尾,删除元素
  • getFirst 获取队头元素,类似element。TMD,这里为什么不用element?
  • getLast 获取队尾元素
  • peekFirst 获取队头元素
  • peekLast 获取队尾元素

当然,这里还有pop和push,pop=removeFirst,push=addFirst。//建议不要用,太难记了。

很好很好,由于有了头和尾的概念,api的大小变成了3x2x2=12个!加上原来的那6个,共18个(直接把pop和push忽略)。

你要说,怎么没有take和put这种阻塞的方法啊。原因就是LinkedList并不是并发的集合,你要找的功能,在LinkedBlockingDeque中,肯定会有takeFirst、takeLast、putLast、putFirst等。

5. 队列大小

反过头来再看我们刚开始的SynchronousQueue,为什么无论向里面添加元素,还是提取元素,都会返回失败?它的容量到底是多少?

这是一个非常奇葩的类,它的内部容量是0!已经被硬编码进代码里了。

public int size() {
   return 0;
}
复制代码

它仅仅建立了一个通道,一旦有生产,消费者就能立马拿到它,它本身是不不存任何数据的。Executors.newCachedThreadPool()就使用了SynchronousQueue。

常用的LinkedBlockingQueue、ArrayBlockingQueue,都是有界的。

但这里有一个比较奇葩的是类,加偶走ConcurrentLinkedQueue,从名字可以看出来,它并不是一个阻塞的并发类,所以并没有take和put等方法。另外,它是无界的,使用时要特别小心。你或许说,我每次判断它的size()方法来看一下是否越界不就行了。

public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
           // Collection.size() spec says to max out
           if (++count == Integer.MAX_VALUE)
                break;
    return count;
}
复制代码

如上代码,这就是比较坑的地方,size方法,并不是O(1)时间级别的。xjjdog就曾在上面吃过大亏,最后还是不敢乱用了。