手写一个自己的线程池
手写一个自己的线程池
一.背景
在java中中,Thread是一个重量级的资源,它的创建、启动、销毁,整个生命周期都是非常耗费资源的,加之在jvm中,一个程序能创建的线程数量是有限的,没有控制好线程的数量也有可能会引发OOM、线程争夺资源内存不够产生死锁等一系列的异常情况。线程数量和系统性能是呈抛物线的关系,也就是线程的数量得达到某一个数值的时候,性能就会下降,因此,对线程的重复利用、管理、尤其是数量方面的管理,能直接的反映出程序的性能和表现。
二.线程池实现
1. 线程池接口定义(ThreadPool)
ThreadPool定义了线程池该有的能力,即基本的操作和方法。
**
* 提交任务
*
* @param runnable
*/
void execute(Runnable runnable);
/**
* 拒绝任务
*/
void shutdown();
/**
* 获取初始化线程数量
*
* @return
*/
int getInitSize();
/**
* 获取最大线程数量
*
* @return
*/
int getMaxSize();
/**
* 获取核心线程数量
*
* @return
*/
int getCoreSize();
...
- execute(Runnable runnable); 提交任务的接口,接收 Runnable 方法
- shutdown(); 关闭线程池,当前实现会让线程结束当前任务后不再继续执行新任务
2. 任务队列接口定义(RunnableQueue)
该接口提供操作任务队列的方法,新增任务、获取任务等,使用过的是一个 BlockedQueue队列,带有最大数量限制
/**
* 添加任务
*
* @param runnable
*/
void offer(Runnable runnable);
/**
* 获取任务
*
* @return
* @throws InterruptedException
*/
Runnable take() throws InterruptedException;
- offer(Runnable runnable); 添加任务带有拒绝策略(假如队列已满)
- Runnable take() throws InterruptedException; 任务队列为空时,会阻塞当前线程,直至offer后唤醒
3. 线程工厂接口定义(ThreadFactory)
该接口提供了线程创建的接口,主要是为了便于定制化线程对象,例如group 优先级,守护线程,name等等
/**
* 定义创建线程的接口
*
* @param runnable
* @return
*/
Thread createThread(Runnable runnable);
4. 拒绝策略(DenyPolicy)
主要用于任务队列达到上限时,对新提交的任务决定采用何种方式处理
/**
* 定义拒绝策略接口
*
* @param runnable
* @param threadPool
*/
void reject(Runnable runnable, ThreadPool threadPool);
/**
* 直接丢掉
*/
class DiscardDenyPolicy implements DenyPolicy {
@Override
public void reject(Runnable runnable, ThreadPool threadPool) {
}
}
- reject 定义拒绝策略接口(目前实现了三种,文章举例其中一种,直接丢弃任务)
5. 线程任务(InternalTask)
Runnable的一个实现,也是工作线程队列中的一个对象
/**
* 线程队列对象
*/
public class InternalTask implements Runnable {
object and more ...
/**
* 不断的获取任务队列中的第一个任务并执行
*/
@Override
public void run() {
while (running && !Thread.currentThread().isInterrupted()) {
try {
Runnable task = runnableQueue.take();
task.run();
} catch (Exception e) {
running = false;
break;
}
}
}
stop and more...
}
- run() 任务的具体逻辑会在这里被调用时执行
6. 任务队列的实现(LinkedRunnableQueue)
为防止资源竞争,这里用的是 synchronized 关键字
/**
* 如果队列满了,就执行响应的拒绝策略
* 若队列未满就添加任务,并唤醒阻塞中的线程
*
* @param runnable
*/
@Override
public void offer(Runnable runnable) {
synchronized (runnableList) {
if (runnableList.size() >= limit) {
denyPolicy.reject(runnable, threadPool);
} else {
runnableList.add(runnable);
runnableList.notifyAll();
}
}
}
/**
* 队列为空,则阻塞线程
* 队位不为空返回队列中的第一个任务
*
* @return
* @throws InterruptedException
*/
@Override
public Runnable take() throws InterruptedException {
synchronized (runnableList) {
while (runnableList.isEmpty()) {
try {
runnableList.wait();
} catch (InterruptedException e) {
throw e;
}
}
return runnableList.removeFirst();
}
}
- offer(Runnable runnable); 达到队列最大数量的时候会有拒绝策略
7. 初始化线程池现(BasicThreadPool)
基本的数量控制、线程工厂、任务队列等
/**
* 不用多说了吧
*/
public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {
this.initSize = initSize;
this.maxSize = maxSize;
this.coreSize = coreSize;
this.threadFactory = threadFactory;
this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.init();
}
/**
* ...
* 总结:就是执行一个任务
*/
private void newThread() {
InternalTask internalTask = new InternalTask(runnableQueue);
Thread thread = this.threadFactory.createThread(internalTask);
ThreadTask threadTask = new ThreadTask(thread, internalTask);
threadQueue.offer(threadTask);
this.activeCount++;
thread.start();
}
/**
* 初始化执行 Start 会执行这个方法
* 不断轮循当线程数量的情况,决定是否继续获取任务队列中执行任务
* 这里用的 synchronized 关键字
*/
@Override
public void run() {
while (!shutdownComplete && !isInterrupted()) {
try {
timeUnit.sleep(keepAliveTime);
} catch (InterruptedException e) {
shutdownComplete = true;
break;
}
synchronized (this) {
if (shutdownComplete) {
break;
}
if (runnableQueue.size() > 0 && activeCount < coreSize) {
for (int i = initSize; i < coreSize; i++) {
newThread();
}
continue;
}
if (runnableQueue.size() > 0 && activeCount < maxSize) {
for (int i = coreSize; i < maxSize; i++) {
newThread();
}
}
if (runnableQueue.size() == 0 && activeCount > coreSize) {
for (int i = coreSize; i < activeCount; i++) {
removeThread();
}
}
}
}
}
- run(); 初始化时会执行这个方法,不断轮循执行任务
- newThread(); 线程池自动维护
三.总结
虽然jdk1.5之后提供了ExecutorService线程池,但是万变不离其宗,原理非常相似,当了解了了其中的原理,对于ExecutorService就能很快理解和灵活使用。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!