并发编程(九)Queue

阻塞队列流程剖析

写时复制List CopyOnWriteArrayList

public class CopyOnWriteArrayListTest {

    public static void main(String[] args) throws InterruptedException {
        List<Integer> list = new CopyOnWriteArrayList<>();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                list.add(i);
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 100000; i < 200000; i++) {
                list.add(i);
            }
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        int size = list.size();
        for (int i = 0; i < size; i++) {
            System.out.println("第" + i + "个元素是:" + list.get(i));
        }
    }
}

延时队列 DelayQueue

public class DelayQueueTest {

    public static void main(String[] args) throws InterruptedException {
//        List<Ticket> list = new ArrayList<>();
//        list.add(new Ticket("票据1", 10000));
//        list.add(new Ticket("票据2", 15000));
//        list.add(new Ticket("票据3", 5000));
//        for(int i =0;i<list.size();){
//            System.out.println(list.get(i).name);
//            list.remove(i);
//        }
        DelayQueue<Ticket> delayQueue = new DelayQueue<>();
        delayQueue.add(new Ticket("票据1", 10000));
        delayQueue.add(new Ticket("票据2", 15000));
        delayQueue.add(new Ticket("票据3", 5000));
        while (delayQueue.size() > 0) {
            System.out.println(delayQueue.take().name);
        }
    }
}

class Ticket implements Delayed {

    public String name;
    /**
     * 过期时间(毫秒)
     */
    public long expireTime;

    public Ticket() {
    }

    public Ticket(String name, long delayTime) {
        this.name = name;
        this.expireTime = System.currentTimeMillis() + delayTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

阻塞队列

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {
        ArrayBlockingQueue<Boll> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        Thread t1 = new Thread(() -> {
            int i = 0;
            for (; ; i++) {
                try {
                    System.out.println("准备放入编号为" + i + "的球");
                    arrayBlockingQueue.put(new Boll(i));
                    System.out.println("已放入编号为" + i + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();

        Thread t2 = new Thread(() -> {
            for (; ; ) {
                try {
                    System.out.println("准备拿一个球");
                    Boll boll = arrayBlockingQueue.take();
                    System.out.println("拿到了编号为" + boll.number + "的球");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t2.start();
    }
}

class Boll {
    int number;

    public Boll(int number) {
        this.number = number;
    }

}

自定义阻塞队列

public class MyBlockingQueue<T> {

    int count;
    //拿元素指针
    int head;
    //放元素指针
    int tail;
    Object[] data;
    public MyBlockingQueue(int capacity){
        data = new Object[capacity];
    }
    /**********锁机制***********/
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public void put(T t) throws InterruptedException {
        try {
            //可中断锁
            lock.lockInterruptibly();
            if(count == data.length){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待

                //4.被唤醒后重新获取锁              
                notFull.await();
            }
            data[tail] = t;
            if(++tail == data.length){
                tail = 0;
            }
            count++;
            //将notEmpty中条件队列的结点转移到lock的同步队列
            notEmpty.signal();
        }finally {
            //唤醒下一个同步队列中的结点
            lock.unlock();
        }

    }

    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        try {
            lock.lockInterruptibly();
            if(count == 0){
                //1.将当前结点加入条件队列
                //2.释放锁唤醒同步队列的结点
                //3.陷入等待
          
                //4.被唤醒后重新获取锁              
                notEmpty.await();
            }
            T t = (T) data[head];
            data[head] = null;
            if(++head == data.length){
                head = 0;
            }
            count--;
            //将notFull中条件队列的结点转移到lock的同步队列
            notFull.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
}
# Queue 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×