☕ Java Concurrency ThreadPool
原文地址:https://jenkov.com/tutorials/java-concurrency/thread-pools.html
线程池是一组可重复使用的线程,每个线程都可能执行不止一个任务。使用线程池是每次执行任务时都新创建一个线程的替代方案。
与重复使用一个已经创建好的线程相比,创建一个新的线程对性能造成的影响更大。这就是相比为每一个任务新创建一个线程,使用线程池的吞吐更高的原因。
除此之外,使用线程池可以更加方便地控制当前系统中活跃线程的个数。每个线程都会消耗系统中的一部分资源,比如内存,所以如果系统中活跃线程的个数太多的话,这些线程会消耗大量的资源,从而导致计算机变得很慢。比如,当内存消耗的太多时,操作系统会开始将RAM swap到磁盘。
在这个关于线程池的教程中,我将会向你解释线程池是如何工作的,如何使用线程池,以及如何实现一个Java线程池。记住,Java已经有了一个内置的线程池——Java ExecutorService,所以你不必自己实现它,就可以使用线程池。然而,有时你也可能需要实现自己的线程池,比如当你需要添加一些ExecutorService没有提供的功能时。或者,你只想造一个轮子玩一玩。
线程池是如何工作的
与为每个任务创建一个线程来并发跑不同,任务可以直接提交到线程池中。只要当前线程池中有idle的线程,这个任务就会被分配给它们执行。在内部,这些任务被插入到了一个BlockingQueue,线程池中的线程从这个队列中取任务。线程池中的剩余的idle的线程会被阻塞,等待从队列中取任务。
线程池的使用场景
线程池经常在多线程服务器中使用。每个到达服务器的连接都被包装成一个任务,然后提交给线程池。线程池中的线程会在这些连接上并发地处理请求。下面我们会详细看一下如何在Java中实现一个多线程服务器。
内置的Java线程池
在java.util.concurrent包中有一个内置的线程池,所以你不必实现自己的线程池。你可以看看java.util.concurrent.ExecutorService这个类。
Java线程池的实现
下面是一个简单版本的Java线程池实现方法。这个实现中使用了自Java 5以来支持的Java BlockingQueue。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThreadRunnable> runnables = new ArrayList<>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new ArrayBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
PoolThreadRunnable poolThreadRunnable =
new PoolThreadRunnable(taskQueue);
runnables.add(poolThreadRunnable);
}
for(PoolThreadRunnable runnable : runnables){
new Thread(runnable).start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if(this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThreadRunnable runnable : runnables){
runnable.doStop();
}
}
public synchronized void waitUntilAllTasksFinished() {
while(this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
下面是PoolThreadRunnable类,该类实现了Runnable接口,所以它可以被Java的Thread来执行。
import java.util.concurrent.BlockingQueue;
public class PoolThreadRunnable implements Runnable {
private Thread thread = null;
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThreadRunnable(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
this.thread = Thread.currentThread();
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
//break pool thread out of dequeue() call.
this.thread.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}
下面是如何使用ThreadPool的一个示例:
public class ThreadPoolMain {
public static void main(String[] args) throws Exception {
ThreadPool threadPool = new ThreadPool(3, 10);
for(int i=0; i<10; i++) {
int taskNo = i;
threadPool.execute( () -> {
String message =
Thread.currentThread().getName()
+ ": Task " + taskNo ;
System.out.println(message);
});
}
threadPool.waitUntilAllTasksFinished();
threadPool.stop();
}
}
线程池的实现包括两部分:ThreadPool类是线程池的公共接口,PoolThread类是实现执行任务的线程。
为了执行一个任务,使用一个Runnable的实现作为参数,调用ThreadPool.execute方法。这个Runnable会被插入到blocking queue中,等待从队列中被取出。
这个Runnable会被一个idle的线程从队列中取出并执行。你可以在PoolThread.run()中看到这些。当执行完毕之后,线程会尝试再次从队列中取出一个Runnable,直到停止为止。
为了停止一个ThreadPool,需要调用ThreadPool.stop方法。线程池中的每个线程都会被调用doStop方法。
注意PoolThread.doStop方法中的this.interrupt语句。这会保证卡在taskQueue.dequeue的wait方法上的线程退出,然后抛出InterruptedException
异常并退出dequeue方法。这个异常会在PoolThread.run方法中捕获。然后会再次检查isStopped,并成功退出run方法,然后线程就gg了。