Java 并发 - BlockingQueue 接口


java.util.concurrent.BlockingQueue 接口是 Queue 接口的子接口,还支持诸如在检索元素之前等待队列变为非空以及在存储元素之前等待队列中的空间变得可用等操作。

阻塞队列方法

先生。 方法及说明
1

布尔加法(E e)

如果可以立即插入指定元素而不违反容量限制,则将指定元素插入此队列,成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。

2

布尔值包含(对象o)

如果此队列包含指定元素,则返回 true。

3

int danceTo(Collection<? super E> c)

从此队列中删除所有可用元素并将它们添加到给定集合中。

4

int danceTo(Collection<? super E> c, int maxElements)

从此队列中删除最多给定数量的可用元素并将它们添加到给定的集合中。

5

布尔报价(E e)

如果可以在不违反容量限制的情况下立即执行此操作,则将指定元素插入此队列,成功时返回 true,如果当前没有可用空间则返回 false。

6

boolean Offer(E e, 长时间超时, TimeUnit 单位)

将指定的元素插入此队列中,如果需要空间可用,则等待指定的等待时间。

7

E poll(长超时,TimeUnit单位)

检索并删除此队列的头部,如果需要元素变得可用,则等待指定的等待时间。

8

无效放置(E e)

将指定的元素插入此队列,必要时等待空间可用。

9

int 剩余容量()

返回此队列理想情况下(在没有内存或资源限制的情况下)可以无阻塞接受的附加元素的数量,如果没有内在限制,则返回 Integer.MAX_VALUE。

10

布尔删除(对象o)

从此队列中删除指定元素的单个实例(如果存在)。

11

E采取()

检索并删除此队列的头部,如有必要,则等待直到有元素可用。

例子

以下 TestThread 程序显示了 BlockingQueue 接口在基于线程的环境中的用法。

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class TestThread {

   public static void main(final String[] arguments) throws InterruptedException {
      BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

      Producer producer = new Producer(queue);
      Consumer consumer = new Consumer(queue);

      new Thread(producer).start();
      new Thread(consumer).start();

      Thread.sleep(4000);
   }  


   static class Producer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Producer(BlockingQueue queue) {
         this.queue = queue;
      }

      @Override
      public void run() {
         Random random = new Random();

         try {
            int result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
            
            result = random.nextInt(100);
            Thread.sleep(1000);
            queue.put(result);
            System.out.println("Added: " + result);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }	   
   }

   static class Consumer implements Runnable {
      private BlockingQueue<Integer> queue;

      public Consumer(BlockingQueue queue) {
         this.queue = queue;
      }
      
      @Override
      public void run() {
         
         try {
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
            System.out.println("Removed: " + queue.take());
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
   }
}

这将产生以下结果。

输出

Added: 52
Removed: 52
Added: 70
Removed: 70
Added: 27
Removed: 27