线程池核心要点

  • 包装任务:将任务转移给自己创建的线程中执行
  • 阻塞队列:将无法立即处理的任务放入阻塞队列,等待空闲线程执行

代码实现

public class MyThreadPoolExecutor implements Executor {

    private AtomicInteger number = new AtomicInteger(1);
    /**
     * 包装任务
     */
    private class Worker implements Runnable {
        Thread thread;
        Runnable firstTask;

        Worker(Runnable runnable) {
            this.firstTask = runnable;
            this.thread = new Thread(this, "my-thread-pool-" + number.getAndIncrement());
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

    /**
     * 阻塞队列
     */
    private ArrayBlockingQueue<Runnable> workerQueue;
    /**
     * 线程数
     */
    private int capacity;

    private AtomicInteger count = new AtomicInteger();

    public MyThreadPoolExecutor(int capacity, int queueSize) {
        this.capacity = capacity;
        workerQueue = new ArrayBlockingQueue<>(queueSize);
    }


    @Override
    public void execute(Runnable command) {
        //1.如果线程池里面的线程数小于线程池容量
        if (count.get() < capacity){
            addWorker(command);
        }else if(!workerQueue.offer(command)) //否则将任务加入到队列
           //加入队列失败
            System.err.println("队列已满");

    }

    private void addWorker(Runnable command){
        count.incrementAndGet();
        //将任务封装到自定义worker中,并启动
        Worker worker = new Worker(command);
        worker.thread.start();
    }
    
    private void runWorker(Worker worker){
        Runnable task = worker.firstTask;
        worker.firstTask = null;//help gc
        //处理真正的任务
        while(task != null || (task = getTask())!=null){
            task.run();
            task = null;
        }
    }
    /**
     * 从阻塞队列中取,若没有任务,则陷入阻塞等待
     * @return {@link Runnable}
     */
    private Runnable getTask(){
        try {
            return workerQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}