多线程编程学习五(线程池的创建)
一、概述在开发过程中,线程池可以带来如下好处:
New Thread的弊端如下: Java提供的四种线程池的好处在于: 二、Executors 创建线程池Java通过Executors提供四种线程池,分别为: newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 三、ThreadPoolExecutor 创建线程池线程池不建议使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors各个方法的弊端: 这里介绍三种创建线程池的方式: Example 1: //org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); Example 2: ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5,200,0L,TimeUnit.MILLISECONDS,1)">new LinkedBlockingQueue<Runnable>(1024),namedThreadFactory,1)">new ThreadPoolExecutor.AbortPolicy()); pool.execute(()-> System.out.println(Thread.currentThread().getName())); pool.shutdown();gracefully shutdown Example 3: <bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> property name="corePoolSize" value="10" /> ="maxPoolSize"="100" ="queueCapacity"="2000" ="threadFactory"= threadFactory ="rejectedExecutionHandler"> ref local="rejectedExecutionHandler" </property> bean> //in code userThreadPool.execute(thread); ? tips:ThreadPoolExecutor详解可以参考:https://www.cnblogs.com/jmcui/p/11552583.html 四、自建线程池? ? 我们要建一个简单的线程池,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。 public interface ThreadPool<Job extends Runnable> { /** * 执行一个Job,这个Job需要实现Runnable * * @param job */ void execute(Job job); * 关闭线程池 shutdown(); * 增加工作者线程 * * num void addWorkers(int num); * 减少工作者线程 * * void removeWorker( * 得到正在等待执行的任务数量 * * @return getJobSize(); }ThreadPool class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { * 线程池最大限制数、默认的数量、最小的数量 private static final Integer MAX_WORKER_NUMBERS = 10; final Integer DEFAULT_WORKER_NUMBERS = 5final Integer MIN_WORKER_NUMBERS = 1 * 这是一个待工作列表,将会向里面插入工作 final LinkedList<Job> jobs = new LinkedList<>(); * 工作者列表(固定数目的线程,不断去执行 jobs 中的任务) final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>()); * 工作者线程的数量 int workerNum = DEFAULT_WORKER_NUMBERS; * 线程编号生成 private AtomicLong threadNum = AtomicLong(); public DefaultThreadPool() { initializeWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool( num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; initializeWokers(workerNum); } @Override execute(Job job) { if (job != null) { 添加一个工作,然后进行通知 synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } @Override shutdown() { for (Worker worker : workers) { worker.shutdown(); } } @Override num) { (jobs) { 限制新增的Worker数量不能超过最大值 if (num + this.workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWokers(num); this.workerNum += num; } } @Override if (num >= .workerNum) { throw new IllegalArgumentException("beyond workNum"); } 按照给定的数量停止Worker int count = 0; while (count < num) { 每次都移除第一个线程 Worker worker = workers.get(0); if (workers.remove(worker)) { worker.shutdown(); count++; } } this.workerNum -= count; } } @Override getJobSize() { return jobs.size(); } * 初始化线程工作者 * * void initializeWokers(for (int i = 0; i < num; i++) { Worker worker = Worker(); workers.add(worker); Thread thread = new Thread(worker,"ThreadPool-Worker-" + threadNum.incrementAndGet()); thread.start(); } } * 工作者,负责消费任务 class Worker implements Runnable { * 是否工作 */ volatile boolean running = true; @Override run() { while (running) { Job job; (jobs) { 如果工作者列表是空的,那么就wait (jobs.isEmpty()) { try { jobs.wait(); } catch (InterruptedException ex) { 感知到外部对 WorkerThread 的中断操作,返回 Thread.currentThread().interrupt(); ; } } 取出一个Job job = jobs.removeFirst(); } ) { { job.run(); } (Exception ex) { 忽略Job执行中的Exception } } } } shutdown() { running = false; } } }DefaultThreadPool ? ??可以看到,线程池的本质就是使用了一个线程安全的工作队列(workers)连接工作者线程和客户端线程,客户端线程将任务(job)放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。 ? ??我们利用自建的线程池来构造一个简单的 Web 服务器,这个 Web 服务器用来处理 HTTP 请求,目前只能处理简单的文本和 JPG 图片内容。这个 Web 服务器使用 main 线程不断地接受客户端 Socket 的连接,将连接以及请求提交给线程池处理,这样使得 Web 服务器能够同时处理多个客户端请求。 class SimpleHttpServer { * 处理HttpRequest的线程池 static ThreadPool<HttpRequestHandler> THREAD_POOL = new DefaultThreadPool<>(1); * SimpleHttpServer的根路径(可以理解成 Tomcat 的 Root 目录) static String basePath; ServerSocket serverSocket; * 服务监听端口 int port = 8080; void setPort( port) { if (port > 0) { SimpleHttpServer.port = port; } } setBasePath(String basePath) { if (basePath != null && new File(basePath).exists() && File(basePath).isDirectory()) { SimpleHttpServer.basePath = basePath; } } * 启动SimpleHttpServer * * @throws Exception void start() throws Exception { serverSocket = ServerSocket(port); Socket socket; while ((socket = serverSocket.accept()) != 接收一个客户端Socket,生成一个HttpRequestHandler,放入线程池执行 THREAD_POOL.execute( HttpRequestHandler(socket)); } serverSocket.close(); } class HttpRequestHandler Runnable { private Socket socket; HttpRequestHandler(Socket socket) { this.socket = socket; } @Override socket 输入 BufferedReader reader = socket 输出 PrintWriter out = ; BufferedReader br = ; InputStream in = { reader = new BufferedReader( InputStreamReader(socket.getInputStream())); String header = reader.readLine(); 由相对路径计算出绝对路径 String filePath = basePath + header.split("s+")[1]; out = PrintWriter(socket.getOutputStream()); 如果请求资源的后缀为jpg或者ico,则读取资源并输出 if (filePath.endsWith("jpg") || filePath.endsWith("ico")) { in = FileInputStream(filePath); ByteArrayOutputStream baos = ByteArrayOutputStream(); i; while ((i = in.read()) != -1) { baos.write(i); } byte[] array = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server: Molly"); out.println("Content-Type: image/jpeg"); out.println("Content-Length: " + array.length); out.println(""); socket.getOutputStream().write(array,0,array.length); } else { br = new InputStreamReader( FileInputStream(filePath))); out = PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200 OK"); out.println("Content-Type: text/html; charset=UTF-8"); out.println(""); String line = ; while ((line = br.readLine()) != ) { out.println(line); } } out.flush(); } (Exception ex) { out.println("HTTP/1.1 500"); out.println(""); out.flush(); } finally { close(br,in,reader,out,socket); } } } * 关闭流或者Socket * * closeables close(Closeable... closeables) { if (closeables != (Closeable closeable : closeables) { { closeable.close(); } (Exception ex) { } } } } void main(String[] args) Exception { SimpleHttpServer.setPort(-1); SimpleHttpServer.setBasePath("D:"); SimpleHttpServer.start(); } }SimpleHttpServer ? ? 以上案例参考自《Java 并发编程的艺术》 (编辑:北几岛) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |