文章目录
Thread Pool简介
Executors,Executor 和 ExecutorService
ThreadPoolExecutor
ScheduledThreadPoolExecutor
ForkJoinPool
java中ThreadPool的介绍和使用
Thread Pool简介
在Java中,threads是和系统的threads相对应的,用来处理一系列的系统资源。不管在windows和linux下面,能开启的线程个数都是有限的,如果你在java程序中无限制的创建thread,那么将会遇到无线程可创建的情况。
cpu的核数是有限的,如果同时有多个线程正在运行中,那么cpu将会根据线程的优先级进行轮循,给每个线程分配特定的cpu时间。所以线程也不是越多越好。
在java中,代表管理ThreadPool的接口有两个:ExecutorService和Executor。
我们运行线程的步骤一般是这样的:1. 创建一个ExecutorService。 2.将任务提交给ExecutorService。3.ExecutorService调度线程来运行任务。
画个图来表示:
下面我讲一下,怎么在java中使用ThreadPool。
Executors,Executor 和 ExecutorService
Executors 提供了一系列简便的方法,来帮助我们创建ThreadPool。
Executor接口定义了一个方法:
public interface Executor {
void execute ( Runnable command) ;
}
ExecutorService继承了Executor,提供了更多的线程池的操作。是对Executor的补充。
根据接口实现分离的原则,我们通常在java代码中使用ExecutorService或者Executor,而不是具体的实现类。
我们看下怎么通过Executors来创建一个Executor和ExecutorService:
Executor executor = Executors. newSingleThreadExecutor ( ) ;
executor. execute ( ( ) - > log. info ( "in Executor" ) ) ;
ExecutorService executorService= Executors. newCachedThreadPool ( ) ;
executorService. submit ( ( ) - > log. info ( "in ExecutorService" ) ) ;
executorService. shutdown ( ) ;
关于ExecutorService的细节,我们这里就多讲了,感兴趣的朋友可以参考之前我写的ExecutorService的详细文章。
ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService接口的一个实现,它可以为线程池添加更加精细的配置,具体而言它可以控制这三个参数:corePoolSize,maximumPoolSize,和 keepAliveTime。
PoolSize就是线程池里面的线程个数,corePoolSize表示的是线程池里面初始化和保持的最小的线程个数。
如果当前等待线程太多,可以设置maximumPoolSize来提供最大的线程池个数,从而线程池会创建更多的线程以供任务执行。
keepAliveTime是多余的线程未分配任务将会等待的时间。超出该时间,线程将会被线程池回收。
我们看下怎么创建一个ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor ( 1 , 1 , 0 L, TimeUnit. MILLISECONDS,
new LinkedBlockingQueue < Runnable> ( ) ) ;
threadPoolExecutor. submit ( ( ) - > log. info ( "submit through threadPoolExecutor" ) ) ;
threadPoolExecutor. shutdown ( ) ;
上面的例子中我们通过ThreadPoolExecutor的构造函数来创建ThreadPoolExecutor。
通常来说Executors已经内置了ThreadPoolExecutor的很多实现,我们来看下面的例子:
ThreadPoolExecutor executor1 =
( ThreadPoolExecutor) Executors. newFixedThreadPool ( 2 ) ;
executor1. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
executor1. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
executor1. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
log. info ( "executor1 poolsize {}" , executor1. getPoolSize ( ) ) ;
log. info ( "executor1 queuesize {}" , executor1. getQueue ( ) . size ( ) ) ;
executor1. shutdown ( ) ;
上的例子中我们Executors.newFixedThreadPool(2)来创建一个ThreadPoolExecutor。
上面的例子中我们提交了3个task。但是我们pool size只有2。所以还有一个1个不能立刻被执行,需要在queue中等待。
我们再看一个例子:
ThreadPoolExecutor executor2 =
( ThreadPoolExecutor) Executors. newCachedThreadPool ( ) ;
executor2. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
executor2. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
executor2. submit ( ( ) - > {
Thread. sleep ( 1000 ) ;
return null;
} ) ;
log. info ( "executor2 poolsize {}" , executor2. getPoolSize ( ) ) ;
log. info ( "executor2 queue size {}" , executor2. getQueue ( ) . size ( ) ) ;
executor2. shutdown ( ) ;
上面的例子中我们使用Executors.newCachedThreadPool()来创建一个ThreadPoolExecutor。 运行之后我们可以看到poolsize是3,而queue size是0。这表明newCachedThreadPool会自动增加pool size。
如果thread在60秒钟之类没有被激活,则会被收回。
这里的Queue是一个SynchronousQueue,因为插入和取出基本上是同时进行的,所以这里的queue size基本都是0.
ScheduledThreadPoolExecutor
还有个很常用的ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor,并且实现了ScheduledExecutorService接口。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService
我们看下怎么使用:
ScheduledExecutorService executor = Executors. newScheduledThreadPool ( 5 ) ;
executor. schedule ( ( ) - > {
log. info ( "Hello World" ) ;
} , 500 , TimeUnit. MILLISECONDS) ;
上面的例子中,我们定义了一个定时任务将会在500毫秒之后执行。
之前我们也讲到了ScheduledExecutorService还有两个非常常用的方法:
scheduleAtFixedRate - 以开始时间为间隔。
scheduleWithFixedDelay - 以结束时间为间隔。
CountDownLatch lock = new CountDownLatch ( 3 ) ;
ScheduledExecutorService executor2 = Executors. newScheduledThreadPool ( 5 ) ;
ScheduledFuture< ? > future = executor2. scheduleAtFixedRate ( ( ) - > {
log. info ( "in ScheduledFuture" ) ;
lock. countDown ( ) ;
} , 100 , TimeUnit. MILLISECONDS) ;
lock. await ( 1000 , TimeUnit. MILLISECONDS) ;
future. cancel ( true ) ;
ForkJoinPool
ForkJoinPool是在java 7 中引入的新框架,我们将会在后面的文章中详细讲解。 这里做个简单的介绍。
ForkJoinPool主要用来生成大量的任务来做算法运算。如果用线程来做的话,会消耗大量的线程。但是在fork/join框架中就不会出现这个问题。
在fork/join中,任何task都可以生成大量的子task,然后通过使用join()等待子task结束。
这里我们举一个例子:
static class TreeNode {
int value;
Set< TreeNode> children;
TreeNode ( int value, TreeNode. . . children) {
this . value = value;
this . children = Sets. newHashSet ( children) ;
}
}
定义一个TreeNode,然后遍历所有的value,将其加起来:
public class CountingTask extends RecursiveTask < Integer> {
private final TreeNode node;
public CountingTask ( TreeNode node) {
this . node = node;
}
@Override
protected Integer compute ( ) {
return node. value + node. children. stream ( )
. map ( childNode - > new CountingTask ( childNode) . fork ( ) ) . mapToInt ( ForkJoinTask: : join) . sum ( ) ;
}
}
下面是调用的代码:
public static void main ( String[ ] args) {
TreeNode tree = new TreeNode ( 5 ,
new TreeNode ( 3 ) , new TreeNode ( 2 ,
new TreeNode ( 2 ) , new TreeNode ( 8 ) ) ) ;
ForkJoinPool forkJoinPool = ForkJoinPool. commonPool ( ) ;
int sum = forkJoinPool. invoke ( new CountingTask ( tree) ) ;
}
本文的例子请参考https://github.com/ddean2009/learn-java-concurrency/tree/master/threadPool
更多教程请参考 flydean的博客
(编辑:北几岛)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!