手写一个自己的线程池

手写一个自己的线程池

源码都在我的GitHub上了

一.背景

在java中中,Thread是一个重量级的资源,它的创建、启动、销毁,整个生命周期都是非常耗费资源的,加之在jvm中,一个程序能创建的线程数量是有限的,没有控制好线程的数量也有可能会引发OOM、线程争夺资源内存不够产生死锁等一系列的异常情况。线程数量和系统性能是呈抛物线的关系,也就是线程的数量得达到某一个数值的时候,性能就会下降,因此,对线程的重复利用、管理、尤其是数量方面的管理,能直接的反映出程序的性能和表现。

二.线程池实现

1. 线程池接口定义(ThreadPool)


ThreadPool定义了线程池该有的能力,即基本的操作和方法。

**
     * 提交任务
     *
     * @param runnable
     */
    void execute(Runnable runnable);

    /**
     * 拒绝任务
     */
    void shutdown();


    /**
     * 获取初始化线程数量
     *
     * @return
     */
    int getInitSize();

    /**
     * 获取最大线程数量
     *
     * @return
     */
    int getMaxSize();

    /**
     * 获取核心线程数量
     *
     * @return
     */
    int getCoreSize();

    ...
  • execute(Runnable runnable); 提交任务的接口,接收 Runnable 方法
  • shutdown(); 关闭线程池,当前实现会让线程结束当前任务后不再继续执行新任务

2. 任务队列接口定义(RunnableQueue)


该接口提供操作任务队列的方法,新增任务、获取任务等,使用过的是一个 BlockedQueue队列,带有最大数量限制
/**
     * 添加任务
     *
     * @param runnable
     */
    void offer(Runnable runnable);

    /**
     * 获取任务
     *
     * @return
     * @throws InterruptedException
     */
    Runnable take() throws InterruptedException;
  • offer(Runnable runnable); 添加任务带有拒绝策略(假如队列已满)
  • Runnable take() throws InterruptedException; 任务队列为空时,会阻塞当前线程,直至offer后唤醒

3. 线程工厂接口定义(ThreadFactory)


该接口提供了线程创建的接口,主要是为了便于定制化线程对象,例如group 优先级,守护线程,name等等
/**
     * 定义创建线程的接口
     *
     * @param runnable
     * @return
     */
    Thread createThread(Runnable runnable);

4. 拒绝策略(DenyPolicy)


主要用于任务队列达到上限时,对新提交的任务决定采用何种方式处理
/**
     * 定义拒绝策略接口
     *
     * @param runnable
     * @param threadPool
     */
    void reject(Runnable runnable, ThreadPool threadPool);


    /**
     * 直接丢掉
     */
    class DiscardDenyPolicy implements DenyPolicy {

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
        }
    }



  • reject 定义拒绝策略接口(目前实现了三种,文章举例其中一种,直接丢弃任务)

5. 线程任务(InternalTask)


Runnable的一个实现,也是工作线程队列中的一个对象
/**
 * 线程队列对象
 */
public class InternalTask implements Runnable {

    object and more ...

    /**
     * 不断的获取任务队列中的第一个任务并执行
     */
    @Override
    public void run() {
        while (running && !Thread.currentThread().isInterrupted()) {
            try {
                Runnable task = runnableQueue.take();
                task.run();
            } catch (Exception e) {
                running = false;
                break;
            }
        }
    }


    stop and more...
}
  • run() 任务的具体逻辑会在这里被调用时执行

6. 任务队列的实现(LinkedRunnableQueue)


为防止资源竞争,这里用的是 synchronized 关键字
 /**
     * 如果队列满了,就执行响应的拒绝策略
     * 若队列未满就添加任务,并唤醒阻塞中的线程
     *
     * @param runnable
     */
    @Override
    public void offer(Runnable runnable) {
        synchronized (runnableList) {
            if (runnableList.size() >= limit) {
                denyPolicy.reject(runnable, threadPool);
            } else {
                runnableList.add(runnable);
                runnableList.notifyAll();
            }
        }

    }


    /**
     * 队列为空,则阻塞线程
     * 队位不为空返回队列中的第一个任务
     *
     * @return
     * @throws InterruptedException
     */
    @Override
    public Runnable take() throws InterruptedException {

        synchronized (runnableList) {
            while (runnableList.isEmpty()) {
                try {
                    runnableList.wait();
                } catch (InterruptedException e) {
                    throw e;
                }
            }
            return runnableList.removeFirst();
        }
    }
  • offer(Runnable runnable); 达到队列最大数量的时候会有拒绝策略

7. 初始化线程池现(BasicThreadPool)


基本的数量控制、线程工厂、任务队列等
    /**
     * 不用多说了吧
     */
    public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
        this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init();
    }


    /**
     * ...

     * 总结:就是执行一个任务
     */
    private void newThread() {
        InternalTask internalTask = new InternalTask(runnableQueue);
        Thread thread = this.threadFactory.createThread(internalTask);
        ThreadTask threadTask = new ThreadTask(thread, internalTask);
        threadQueue.offer(threadTask);
        this.activeCount++;
        thread.start();
    }



    /**
     * 初始化执行 Start 会执行这个方法
     * 不断轮循当线程数量的情况,决定是否继续获取任务队列中执行任务
     * 这里用的 synchronized 关键字
     */
    @Override
    public void run() {
        while (!shutdownComplete && !isInterrupted()) {
            try {
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                shutdownComplete = true;
                break;
            }
            synchronized (this) {
                if (shutdownComplete) {
                    break;
                }

                if (runnableQueue.size() > 0 && activeCount < coreSize) {
                    for (int i = initSize; i < coreSize; i++) {
                        newThread();
                    }
                    continue;
                }

                if (runnableQueue.size() > 0 && activeCount < maxSize) {
                    for (int i = coreSize; i < maxSize; i++) {
                        newThread();
                    }
                }
                if (runnableQueue.size() == 0 && activeCount > coreSize) {
                    for (int i = coreSize; i < activeCount; i++) {
                        removeThread();
                    }
                }

            }
        }
    }
  • run(); 初始化时会执行这个方法,不断轮循执行任务
  • newThread(); 线程池自动维护

三.总结


虽然jdk1.5之后提供了ExecutorService线程池,但是万变不离其宗,原理非常相似,当了解了了其中的原理,对于ExecutorService就能很快理解和灵活使用。