Java线程池:原理、使用与优化
2023-06-23 21:19:00

本文转载自:Java线程池:原理、使用与优化 - 知乎 (zhihu.com)

1. 引言

1.1. 为什么需要线程池

在许多情况下,我们的应用程序需要并行处理多个任务,这就涉及到使用多线程。然而,手动管理大量的线程会带来很多问题,比如线程生命周期的管理、资源的分配和回收等,这些都可能引发复杂的编程问题。这也是我们为什么需要线程池。

线程池在以下几个方面提供了极大的便利:

  1. 资源重用:线程的创建和销毁都是需要消耗系统资源的,如果每次任务执行完就销毁线程,那么这个资源的消耗会很高。线程池可以管理和复用已经创建的线程,从而减少了线程创建和销毁带来的性能开销。
  2. 控制并发线程数:线程池可以控制并发的线程数,避免大量线程同时执行,耗尽CPU和内存资源。可以根据系统的性能,调整并发线程的数量。
  3. 提供任务排队机制:如果所有线程都在忙碌,新的任务可以在队列中等待,而不是被拒绝或者无限制的创建新线程。
  4. 提供线程生命周期的管理:线程池可以管理线程的生命周期,包括线程的创建、销毁、任务的分配等,从而让开发者专注于业务逻辑的实现,而无需关心线程管理相关的问题。

类比一下,线程池就好比是一个工厂,它里面有一定数量的工人(线程)。当有新的任务来时,如果有空闲的工人,就直接分配给他去做;如果所有工人都在忙,那么就把任务放到等待区(队列);如果等待区也满了,那就拒绝接受新的任务或者采取其他的策略。这样既可以提高效率,又可以避免资源的浪费。

1.2. Java线程池的重要性

Java线程池在高并发应用开发中具有至关重要的作用。对于需要大量并行处理任务的系统来说,线程池是一种能够提供稳定和有效服务的关键组件。

  1. 提高响应速度:线程池中的线程是预先创建好的,当有新任务时,无需等待线程的创建,直接由现有线程执行,提高了系统的响应速度。
  2. 提高系统稳定性:线程池可以有效控制线程的最大并发数,防止大量并发请求导致系统负载过大而崩溃。当线程池的线程都处于忙碌状态时,新任务会进入等待队列中,等待线程池中的线程空闲时再处理。
  3. 提供更好的资源管理:线程池可以合理地使用有限的资源,比如内存资源。通过限制最大线程数,避免了无限制地创建线程可能导致的内存溢出问题。
  4. 便于线程调度和管理:线程池内部对于线程的管理,使得开发人员可以更专注于业务逻辑,而无需关心线程的创建、销毁等细节。

如果没有线程池,我们可能需要自己管理所有的线程,包括但不限于线程的创建、销毁、任务调度等。这不仅会增加开发的复杂性,还可能导致资源的浪费。而线程池,就像是一个调度中心,可以为我们自动完成这些复杂的任务,让我们可以更专注于实现业务逻辑。

2. 线程池的基本原理

2.1. 什么是线程池

线程池在概念上是一种多线程处理的形式,处理流程中的任务并行处理,减少了线程创建和销毁的开销。

当我们谈论"线程池"(Thread Pools)时,我们指的是利用已经创建的线程来执行当前任务的一种策略。在启动应用程序时,我们会创建一组线程,这些线程被放入一个池中,也就是所谓的“线程池”。任务会被分配给线程池中的线程执行,一旦线程完成任务,它会返回到线程池中,等待下一个任务的分配。

线程池可以有效地控制系统中线程的数量。通过合理设置线程池的大小,可以防止系统在负载较大时创建大量线程导致系统性能急剧下降,这种现象也被称为“线程爆炸”。

在Java中,线程池的创建和使用主要通过 java.util.concurrent.ExecutorService 接口实现。ExecutorService 接口有一些具体的实现,例如 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor,其中 ThreadPoolExecutor 是最常用的线程池。

下面是一个创建线程池并提交任务的简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个线程池,里面包含两个线程
ExecutorService executorService = Executors.newFixedThreadPool(2);

// 提交任务到线程池
executorService.submit(new Task());
executorService.submit(new Task());

// 关闭线程池
executorService.shutdown();
}
}

class Task implements Runnable {
public void run() {
System.out.println("Task executed by " + Thread.currentThread().getName());
}
}

以上代码中,我们首先创建了一个包含两个线程的线程池。然后,我们提交了两个任务到线程池,这两个任务会被线程池中的线程执行。最后,我们调用 shutdown 方法关闭线程池。

2.2. 线程池如何工作的

线程池的工作原理并不复杂。它主要包括一个任务队列和一组工作线程。下面我们会详细介绍一下线程池是如何工作的。

  1. 任务提交:当一个外部任务(通常是一个实现了Runnable接口的对象)提交给线程池后,它首先会被存放在一个任务队列中。这个队列也被称为工作队列。
  2. 任务执行:线程池中的工作线程会不断地从任务队列中取出待处理任务并执行。如果所有的线程都在忙,新来的任务就会等待在任务队列中。
  3. 线程的复用:一旦线程完成了任务的执行,它会返回到线程池中,然后开始处理下一个等待在队列中的任务。这种方式有效地重复使用了线程,避免了频繁创建和销毁线程带来的性能开销。
  4. 线程池关闭:当不再需要线程池时,可以调用其shutdown()或shutdownNow()方法来关闭线程池。这会等待正在执行的任务完成,然后关闭线程池。需要注意的是,shutdownNow()会试图立即停止所有正在执行的任务,然后关闭线程池。

在Java中,可以通过Executor框架来使用线程池。Executor框架是java.util.concurrent包中的一部分,它包括了一系列用于处理多线程执行的工具类。你可以通过这个链接查看更多关于Executor框架的信息。

2.3. 线程池的主要组件

  1. 线程池管理器(ThreadPoolExecutor):线程池管理器的主要作用是管理线程池的生命周期,包括创建和销毁线程,管理任务队列和工作线程。
  2. 工作队列(BlockingQueue):工作队列用于存储已经提交但还未执行的任务。当一个任务被提交给线程池时,如果没有空闲的工作线程,该任务就会被放在工作队列中等待。
  3. 工作线程:工作线程是线程池中真正执行任务的线程。当有任务提交给线程池时,工作线程会从工作队列中取出任务并执行。
  4. 任务接口(Runnable/Callable):任务接口定义了任务的入口,每个任务都必须实现这个接口。Runnable接口的任务没有返回值,而Callable接口的任务有返回值。

3. Java提供的线程池类型

3.1. 不同线程池的区别

Java 的 java.util.concurrent.Executors 类提供了多种类型的线程池,包括 CachedThreadPool、FixedThreadPool、ScheduledThreadPool、SingleThreadExecutor 和 ForkJoinPool。下面我们将对这五种线程池进行比较:

  1. CachedThreadPool:这种线程池的主要特点是灵活地创建新的线程以执行所有提交的任务。如果现有线程没有任务执行,则会利用现有线程,否则会创建新线程。如果线程在一定时间内(默认60秒)没有被使用,那么它会被终止并从线程池中移除。因此,CachedThreadPool是适合处理大量短期异步任务的场景。
1
ExecutorService executorService = Executors.newCachedThreadPool();
  • FixedThreadPool:此种类型的线程池创建一定数量的线程,并且这个数量在其生命周期内不会变化。新提交的任务会在队列中等待,直到有空闲的线程可用。
1
ExecutorService executorService = Executors.newFixedThreadPool(10);
  • ScheduledThreadPool:这种类型的线程池主要用于执行定时任务或者延时任务。它相当于一个固定大小的线程池,但提供了额外的调度功能。
1
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
  • SingleThreadExecutor:这是一种只有一个工作线程的线程池。它确保所有任务(包括任务A、任务B和任务C)都在一个线程中按顺序完成。
1
ExecutorService executorService = Executors.newSingleThreadExecutor();
  • ForkJoinPool: 这是一种专门为大量的并发任务设计的线程池。它使用了一种称为 work-stealing(工作窃取)的技术,可以更有效地利用系统资源。它非常适合执行可以拆分成许多小任务并且可以并行执行的大任务。ForkJoinPool 的线程数默认等于系统可用的处理器数量。
1
2
3
4
5
6
7
8
9
10
11
12
// 创建一个并行级别与处理器数量相等的 ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();

// 默认无参构造方法,获取处理器核数
// public ForkJoinPool() {
// this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
// defaultForkJoinWorkerThreadFactory, null, false);
// }

// 使用自定义的并行级别创建 ForkJoinPool
int parallelism = 4;
ForkJoinPool customForkJoinPool = new ForkJoinPool(parallelism);

总的来说,各种线程池类型都有其适用场景,选择哪种类型主要取决于你的具体需求。例如,如果你需要大量的短期异步任务,可以选择CachedThreadPool;如果你需要执行定时或延时的任务,可以选择ScheduledThreadPool;如果你需要一定数量的线程,可以选择FixedThreadPool;如果你需要保证任务按顺序执行,可以选择SingleThreadExecutor; 对于可以并行处理的大任务,通常使用 ForkJoinPool。

3.2. CachedThreadPool

  1. 核心线程数和最大线程数:在CachedThreadPool中,核心线程数是0,最大线程数是Integer.MAX_VALUE,也就是说,理论上它可以创建到近乎无限的线程数。这是为了满足短期异步大量任务的情况,可以快速创建新线程处理任务。
  2. 使用的队列:SynchronousQueue:CachedThreadPool使用的队列是SynchronousQueue。这是一个没有存储空间的阻塞队列,每一个插入操作必须等待一个相应的删除操作,反之亦然。这意味着你不能在队列中插入一个任务,除非有一个线程正在尝试从队列中删除一个任务,反之亦然。这样的设计,使得线程池更倾向于创建新的线程,而不是将任务放在队列中,这也就符合CachedThreadPool能够创建无限多线程的设计。
  3. 线程的创建和回收机制:当有新任务提交到CachedThreadPool时,首先会尝试找空闲线程处理任务,如果没有空闲线程,就会创建新线程。当线程空闲一段时间(默认60秒)后,如果还没有新的任务,这个线程就会被回收。这是通过内部使用的ThreadPoolExecutor类实现的,ThreadPoolExecutor在运行过程中,会记录非核心线程最后一次任务完成的时间,当这个时间超过keepAliveTime时,就会终止这个线程。这种机制使得CachedThreadPool在没有任务处理时,不会消耗过多资源,能有效管理系统资源。

3.3. FixedThreadPool

FixedThreadPool 是一种固定线程数的线程池。

  1. 核心线程数和最大线程数:在FixedThreadPool中,核心线程数和最大线程数是相等的,都是创建线程池时传入的nThreads参数。这就意味着,无论线程是否空闲,线程池中始终会有nThreads个线程。
1
2
int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
  1. 使用的队列:FixedThreadPool使用的是LinkedBlockingQueue,这是一个基于链表结构的阻塞队列,吞吐量通常要高于ArrayBlockingQueue。LinkedBlockingQueues的大小默认为Integer.MAX_VALUE,这意味着队列的大小只受限于内存大小。
  2. 线程的创建和回收机制:在FixedThreadPool中,因为核心线程数和最大线程数相等,所以线程一旦创建,就不会被回收,线程池中始终维持固定的线程数量。无论线程是否有任务执行,都会保持在线程池中,直到线程池关闭。这就意味着,长期存在的线程,可以有效利用JVM的热点优化,提高处理速度

3.4. ScheduledThreadPool

核心线程数和最大线程数:在 ScheduledThreadPool 中,核心线程数是在创建线程池时确定的,而最大线程数被设置为 Integer.MAX_VALUE,这使得 ScheduledThreadPool 可以应对突发的大量任务。

1
2
int corePoolSize = 10;
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(corePoolSize);

使用的队列:ScheduledThreadPool 使用的是 DelayedWorkQueue,这是一个特殊的队列,可以用于放置延时任务或定时任务。这种队列中的任务,只有在其设定的延时时间到达时,才能从队列中取出

1
2
3
4
5
6
// 这是一个ScheduledThreadPool使用的例子,它将在延迟10秒后执行任务
scheduledExecutorService.schedule(new Runnable(){
public void run(){
// task to be performed
}
}, 10, TimeUnit.SECONDS);

线程的创建和回收机制:在 ScheduledThreadPool 中,线程池在启动时会创建 corePoolSize 数量的核心线程,而不是等到有任务时才创建。对于超过核心线程数的任务,线程池可以创建更多的线程来处理,但是如果一个线程空闲时间超过了 keepAliveTime,这个线程就会被回收,除非线程数小于 corePoolSize。

3.5. SingleThreadExecutor

  1. 核心线程数和最大线程数:在 SingleThreadExecutor 中,核心线程数和最大线程数都被设置为 1。这意味着这个线程池只有一个线程在工作,所有的任务都会在这个线程中顺序执行。
  2. 使用的队列:SingleThreadExecutor 使用的是无限制的 LinkedBlockingQueue。这是一个阻塞队列,当没有任务在队列中时,取任务的操作会被阻塞。因为队列的大小没有限制,所以可以接受任意数量的任务。
  3. 线程的创建和回收机制:在 SingleThreadExecutor 中,线程池在启动时会创建一个线程,这个线程会一直工作,直到线程池关闭。如果这个线程因为异常结束,线程池会创建一个新的线程来替代它。因为线程池中只有一个线程,所以没有线程的回收机制。

3.6. ForkJoinPool

  1. **核心线程数和最大线程数:**在 ForkJoinPool 中,核心线程数和最大线程数是一样的,也就是说,它的线程数量是固定的,不会随着任务的提交和执行而变化。线程数量(并行级别)在创建线程池时设置,默认值是处理器的数量。
  2. **使用的队列:**ForkJoinPool 使用的队列是一种特殊的队列,叫做 ForkJoinTask 队列。这种队列与一般的任务队列不同,它支持任务的拆分和合并,适用于那些可以分解为更小任务的大任务。每个线程维护一个自己的任务队列,用于存储被拆分出来的子任务。
  3. **线程的创建和回收机制:**ForkJoinPool 中的线程在创建后,会一直存在,直到线程池关闭。线程在没有任务执行时,会进入等待状态,不会被回收。当提交新的任务时,等待的线程会被唤醒执行任务。这种设计可以减少线程创建和销毁带来的开销,提高任务处理的效率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Main {
static class MyTask extends RecursiveTask<Integer> {
private int start, end;

public MyTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= 1) {
return end + start;
} else {
int mid = (start + end) / 2;
MyTask task1 = new MyTask(start, mid);
MyTask task2 = new MyTask(mid + 1, end);
task1.fork();
task2.fork();
return task1.join() + task2.join();
}
}
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyTask task = new MyTask(1, 100);
Integer result = forkJoinPool.invoke(task);
System.out.println(result);
}
}

4. 如何使用Java线程池

4.1. 如何提交任务到线程池

在 Java 中,我们有两种方式将任务提交到线程池:execute 方法和 submit 方法。这两种方法的差异在于,execute 用于执行 Runnable 任务,而 submit 可以执行 Callable 和 Runnable 任务,并返回一个 Future 对象。让我们来详细介绍一下这两种方法。

1. 使用 execute 方法提交任务

execute 方法是 Executor 接口的一个方法,用于提交一个 Runnable 任务到线程池。这个方法没有返回值。

1
2
3
4
5
6
7
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Task is running.");
}
});

2. 使用 submit 方法提交任务

submit 方法是 ExecutorService 接口的一个方法,它可以提交 Runnable 或 Callable 任务到线程池,并返回一个 Future 对象,可以用于获取任务的结果或取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService executor = Executors.newFixedThreadPool(10);
Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("Runnable task is running.");
}
});

// submit method with a Callable
Future<Integer> futureWithResult = executor.submit(new Callable<Integer>() {
@Override
public Integer call() {
return 1 + 1;
}
});

try {
Integer result = futureWithResult.get(); // it will return 2
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

4.2. 关闭线程池

线程池的正确管理和关闭是我们在使用线程池时必须注意的重要事项。如果不正确地关闭线程池,可能会导致资源泄露,从而引起应用性能下降,甚至出现错误。

在 Java 中,ExecutorService 提供了两种关闭线程池的方法:shutdown 和 shutdownNow。

1. 使用 shutdown 方法关闭线程池

shutdown 方法会启动线程池的关闭序列。当调用 shutdown 方法时,线程池不再接受新的任务,但会继续处理工作队列中的所有任务。等到所有任务都执行完毕,线程就会关闭。

1
2
3
4
ExecutorService executor = Executors.newFixedThreadPool(10);
// Submit tasks
// ...
executor.shutdown();

一旦调用了 shutdown,再试图向这个线程池提交任务就会抛出 RejectedExecutionException 异常。

2. 使用 shutdownNow 方法关闭线程池

shutdownNow 方法是一种更为激进的关闭方式。当调用 shutdownNow 方法时,线程池会尽力停止所有正在执行的任务,并立即关闭等待的任务。此方法返回那些由于这种关闭动作而尚未开始执行的任务列表。

1
2
3
4
ExecutorService executor = Executors.newFixedThreadPool(10);
// Submit tasks
// ...
List<Runnable> notStartedTasks = executor.shutdownNow();

在决定使用哪种关闭方式时,你需要根据自己的应用需求来判断。如果你希望你的应用在关闭时能优雅地完成所有任务,就应该使用 shutdown;如果你希望你的应用立即停止当前任务并尽快关闭,就应该使用 shutdownNow。

4.3. 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

class WorkerThread implements Runnable {
private String command;

public WorkerThread(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Command = " + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " End.");
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

在这个示例中,我们创建了一个固定大小(5)的线程池。接着,我们提交了10个任务到线程池中。每个任务都是一个实现了 Runnable 接口的 WorkerThread 对象。

当所有任务都提交到线程池后,我们调用 executor.shutdown() 方法来停止接收新的任务,并等待之前所有已提交的任务都完成。

注意这里的 while (!executor.isTerminated()) {} 循环,这是为了让 main 线程等待,直到所有的任务都完成。你可以看到,虽然我们提交了10个任务,但由于线程池的大小只有5,所以一次只能执行5个任务。

每个任务只是打印出开始信息,然后睡眠5秒钟,然后打印结束信息。你可以根据自己的需要修改这部分逻辑。

5. 线程池的监控与优化

5.1. 如何监控线程池的状态

监控线程池的状态是我们管理和优化线程池的重要手段。Java 中提供的 ThreadPoolExecutor 类提供了多个方法来帮助我们获取线程池的当前状态。这些方法包括:

  1. getPoolSize(): 返回线程池中的当前线程数。
  2. getActiveCount(): 返回线程池中当前正在执行任务的线程数。
  3. getCompletedTaskCount(): 返回线程池已完成的任务数。
  4. getTaskCount(): 返回线程池已接收的任务总数。
  5. getQueue(): 返回线程池中的任务队列,包括正在等待执行的任务。

以下是一个示例,展示如何使用这些方法来监控一个线程池的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);

// 在其他线程中运行,定期打印线程池状态
new Thread(() -> {
while (true) {
System.out.println("Current Pool Size: " + executor.getPoolSize());
System.out.println("Current Active Threads: " + executor.getActiveCount());
System.out.println("Total Tasks Completed: " + executor.getCompletedTaskCount());
System.out.println("Total Tasks Submitted: " + executor.getTaskCount());

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

这个示例中,我们在一个新的线程中定期打印线程池的当前状态。这样我们就可以实时观察线程池的变化,从而帮助我们更好地理解和管理线程池。

为了进一步监控和管理线程池,我们可能需要采用更专业的监控工具,例如 Java 的 VisualVM 或者 JMX。这些工具提供了更详细的线程池信息,以及更多的管理功能。

5.2. 如何进行线程池的性能调优

线程池性能调优是一个复杂且微妙的任务,需要综合考虑 CPU 使用率、内存占用、系统的响应时间等多个因素。这里,我们将探讨一些基本的线程池性能调优策略。

  1. 调整线程池大小:线程池的大小对于其性能有显著影响。如果线程池太大,将会产生过多的上下文切换,浪费 CPU 资源。如果线程池太小,CPU 的使用率可能无法达到最大化。线程池的大小通常取决于系统的硬件资源(如CPU的数量)和任务的性质(CPU 密集型或IO 密集型)。
  2. 调整任务队列长度:任务队列长度也会影响线程池的性能。如果队列过长,将会增加任务的等待时间,从而影响系统的响应时间。如果队列过短,可能会因为队列满而拒绝新的任务。
  3. 设置合适的拒绝策略:当任务队列满而无法接受新的任务时,线程池会执行拒绝策略。选择合适的拒绝策略可以更好地处理这种情况,防止系统过载。

以下是一个示例,展示如何根据CPU的数量来设置线程池的大小,并调整任务队列的长度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// CPU的数量
int cpuCount = Runtime.getRuntime().availableProcessors();
// 线程池的大小
int poolSize = cpuCount * 2;
// 任务队列的长度
int queueCapacity = 100;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
poolSize,
poolSize,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadPoolExecutor.CallerRunsPolicy()
);

在这个示例中,我们将线程池的大小设置为 CPU 数量的两倍,这是一个常用的设置,对于 CPU 密集型的任务来说比较合适。我们还将任务队列的长度设置为 100,这个值可以根据实际情况进行调整。如果任务队列满,我们选择了 CallerRunsPolicy 策略,这意味着将任务在调用者的线程中运行,从而为系统提供一种自我保护的机制。

6. 线程池的常见问题与解决方案

6.1. 如何处理线程池的异常

在多线程编程中,线程池异常处理是一个重要的话题。在处理线程池的异常时,我们需要理解:在 Runnable 对象的 run() 方法中,异常不能跨线程传播回主线程。这是因为每个线程都有自己的堆栈,互相之间不会影响。这就意味着如果你没有在你的 Runnable 类中正确处理异常,那么这个异常会被吞掉,并且你可能永远也不知道这个异常的存在。

Java 提供了 Thread.UncaughtExceptionHandler 接口来处理未捕获的异常。在我们的 Runnable 对象中,如果 run() 方法抛出一个未被捕获的异常,JVM 会把这个异常传递给线程的 UncaughtExceptionHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class TaskWithExceptionHandler implements Runnable {
@Override
public void run() {
throw new RuntimeException("Exception from thread");
}
}

Thread thread = new Thread(new TaskWithExceptionHandler());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Caught " + e);
}
});
thread.start();

这样,任何从 run() 方法抛出的未捕获的异常都会被这个 UncaughtExceptionHandler 捕获。

然而,线程池有自己的工作线程,我们无法直接设置 UncaughtExceptionHandler。在这种情况下,我们可以使用 ThreadFactory 创建自己的工作线程,并设置 UncaughtExceptionHandler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Caught " + e);
}
});
return t;
}
}

ExecutorService executor = Executors.newFixedThreadPool(5, new HandlerThreadFactory());

在上述代码中,我们创建了一个 HandlerThreadFactory,在每次创建新线程时都设置了自己的 UncaughtExceptionHandler。

对于使用 Callable 的情况,我们可以使用 Future 的 get() 方法来获取执行结果,该方法会抛出 ExecutionException 异常,如果在 call() 方法中抛出异常,可以通过这种方式捕获。

请记住,适当的异常处理能够帮助我们更好地理解程序的运行情况,便于问题的定位和解决。

6.2. 如何处理任务的结果

在 Java 并发编程中,我们经常使用线程池来执行异步任务。有时,我们需要获取这些任务的执行结果。Java 提供了 Future 和 Callable 来帮助我们做到这一点。

使用 Future 获取结果

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以及获取计算的结果和取消计算的方法。当我们将一个 Callable 任务提交给 ExecutorService,我们会得到一个 Future 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ExecutorService executor = Executors.newFixedThreadPool(5);
Callable<String> task = () -> {
Thread.sleep(1000);
return "Result from callable";
};

Future<String> future = executor.submit(task);

try {
// 调用 get() 方法获取结果,可能会阻塞等待结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

处理 FutureTask 的结果

FutureTask 是 Future 的一个实现,它同时实现了 Runnable 接口,所以可以直接提交给 ExecutorService。FutureTask 也可以用来包装 Callable 或 Runnable 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Callable<String> task = () -> {
Thread.sleep(1000);
return "Result from callable";
};

FutureTask<String> futureTask = new FutureTask<>(task);
executor.submit(futureTask);

try {
String result = futureTask.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

使用 CompletionService 处理任务的结果

如果你有很多的并发任务需要执行,并且需要处理这些任务的结果,ExecutorCompletionService 是一个好的选择。ExecutorCompletionService 是 CompletionService 的一个实现,它可以将 Executor 和 BlockingQueue 功能结合在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

for (int i = 0; i < 5; i++) {
int finalI = i;
completionService.submit(() -> {
Thread.sleep(finalI * 1000);
return "Result from task " + finalI;
});
}

for (int i = 0; i < 5; i++) {
try {
String result = completionService.take().get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

上述代码中,我们提交了 5 个任务给 CompletionService,然后我们调用 take().get() 方法获取结果。这个方法会阻塞等待第一个完成的任务的结果,不像 Future.get() 那样按照任务提交的顺序。

以上就是在 Java 中处理线程池任务结果的一些常用方式,选择哪种方式取决于你的具体需求。希望这些例子能帮助你理解和使用这些技术。

6.3. 如何控制任务的排队和执行

线程池中的任务排队和执行,是通过 Executor 和 ExecutorService 这两个接口来管理的。根据任务的性质和系统的需求,我们可以选择不同的队列和线程池策略来控制任务的排队和执行。

线程池的选择

请查阅 3.1 不同线程池的区别

任务的排队

Java 的线程池通过 BlockingQueue 来存放等待执行的任务。根据任务的性质和系统的需求,我们可以选择不同的阻塞队列。

  1. ArrayBlockingQueue:一个基于数组的有界阻塞队列。这个队列按 FIFO(先进先出)原则对元素进行排序。
  2. LinkedBlockingQueue:一个基于链表的可选容量(默认 Integer.MAX_VALUE)阻塞队列。这个队列按 FIFO 原则排序元素。
  3. PriorityBlockingQueue:一个支持优先级的无界阻塞队列。
  4. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,反之亦然。

代码示例

以下是一个使用 FixedThreadPool 和 LinkedBlockingQueue 的例子:

1
2
3
4
5
ExecutorService executor = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

executor.submit(new Task());

以上代码创建了一个固定大小为 5 的线程池,使用的是 LinkedBlockingQueue 作为任务队列。

在实际应用中,线程池的选择和任务队列的选择需要根据任务的特性(如任务的执行时间、是否是 IO 密集型任务、是否需要按照顺序执行等)和系统的需求(如系统的负载、可用的系统资源等)来决定。

7. 更深入的主题:自定义线程池

7.1. 为何需要自定义线程池

Java 的 java.util.concurrent 包提供了多种类型的内置线程池,例如 FixedThreadPool、CachedThreadPool、ScheduledThreadPool 和 SingleThreadExecutor。这些内置线程池在大多数情况下可以满足我们的需求,但在某些特殊情况下,我们可能需要自定义线程池。

以下是一些可能需要自定义线程池的情况:

  1. 任务的特性:如果你的任务有一些特殊的需求,例如需要按照优先级执行,或者需要在特定的时间执行,那么你可能需要自定义线程池。例如,PriorityBlockingQueue 可以用来实现优先级任务的线程池。
  2. 资源的限制:如果你的系统有特定的资源限制,例如内存、CPU 或者 IO,那么你可能需要自定义线程池来适应这些限制。例如,你可以通过自定义 ThreadPoolExecutor 来限制线程池的大小和任务队列的长度。
  3. 性能的优化:如果你需要对线程池的性能进行细致的优化,例如减少任务的延迟,提高吞吐量,或者降低资源的使用,那么你可能需要自定义线程池。例如,你可以通过自定义 ThreadFactory 来控制线程的创建和销毁,或者通过自定义 RejectedExecutionHandler 来处理任务的拒绝。

7.2. 如何自定义线程池

自定义线程池允许我们精确控制线程池的行为,以满足特殊的需求。在 Java 中,我们可以使用 ThreadPoolExecutor 类来创建自定义的线程池。这个类提供了多个构造函数,让我们可以设定线程池的各种参数。

以下是创建自定义线程池的步骤:

1. 创建 ThreadPoolExecutor 实例

ThreadPoolExecutor 的构造函数需要以下参数:

  • corePoolSize:核心线程数。这是线程池始终保持运行的线程数量。
  • maximumPoolSize:最大线程数。这是线程池在高负载时可以达到的最大线程数量。
  • keepAliveTime 和 unit:空闲线程的存活时间。这是非核心线程在空闲时会被保持运行的时间。
  • workQueue:任务队列。这是存放待处理任务的队列。
  • threadFactory:线程工厂。这是用于创建新线程的工厂。
  • handler:拒绝策略。这是当任务无法提交到线程池时的处理方式。

2. 提交任务

使用 submit 或 execute 方法来提交任务。这些方法会把任务提交到线程池的任务队列,然后线程池会按照任务的顺序来执行它们。

3. 关闭线程池

使用 shutdown 或 shutdownNow 方法来关闭线程池。这些方法会停止线程池的所有线程,然后释放线程池所占用的资源。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(100), // 任务队列
new ThreadFactory() { // 线程工厂
private final AtomicInteger threadNumber = new AtomicInteger(1);

public Thread newThread(Runnable r) {
return new Thread(r, "myThreadPool-" + threadNumber.getAndIncrement());
}
},
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// 提交一个任务
executor.submit(() -> {
// 任务内容
});

// 关闭线程池
executor.shutdown();

以上代码创建了一个自定义的线程池,然后提交了一个任务,最后关闭了线程池。你可以根据自己的需求来调整这个例子,以满足你的特殊需求。

7.3. 自定义线程池的案例

我们将实现一个自定义的线程池,用于处理大量的 HTTP 请求。我们的目标是优化线程的使用,以便在处理高并发请求时,线程池能够有效地利用系统资源。

首先,我们需要一个具有大量核心线程和最大线程的线程池,以便在高并发环境下可以创建更多的线程来处理任务。同时,我们希望线程池在不忙的时候能够释放空闲线程,以节省资源。因此,我们设置了核心线程数为 10,最大线程数为 100,并设置空闲线程的存活时间为 60 秒。

另外,我们希望当任务队列满时,线程池能够抛出异常,以便我们知道系统过载并采取适当的措施。因此,我们选择了 AbortPolicy 作为拒绝策略。

最后,我们希望能够跟踪线程池中的每个线程,所以我们创建了一个自定义的线程工厂,为每个新建的线程命名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心线程数
100, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(500), // 任务队列
new ThreadFactory() { // 线程工厂
private final AtomicInteger threadNumber = new AtomicInteger(1);

public Thread newThread(Runnable r) {
return new Thread(r, "httpRequestPool-" + threadNumber.getAndIncrement());
}
},
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// 模拟提交 HTTP 请求任务
for (int i = 0; i < 1000; i++) {
final int requestId = i;
executor.submit(() -> {
// 模拟处理 HTTP 请求
System.out.println("Processing request " + requestId + " in thread " + Thread.currentThread().getName());
});
}

// 关闭线程池
executor.shutdown();

在上述代码中,当我们向线程池提交 1000 个任务时,线程池会创建更多的线程来处理这些任务,而不是让任务在队列中等待。如果超过了线程池的最大线程数和队列容量,线程池会抛出 RejectedExecutionException 异常,提示我们系统过载。

这个案例展示了如何根据特定应用场景自定义线程池的各种参数,以实现对系统资源的高效利用和对系统负载的有效控制。

8. 总结

8.1. 使用和优化线程池的关键点

使用线程池可以优化系统性能,减少资源消耗,但是想要充分利用线程池并实现其优化,有几个关键点需要注意:

  1. 选择合适的线程池大小:核心线程数和最大线程数的设定需要考虑系统资源和任务特性。如果设置得过小,可能会导致任务等待时间过长;设置得过大,可能会消耗过多的系统资源。一般来说,可以通过压力测试来确定合适的线程池大小。
  2. 选择合适的任务队列:根据任务的特性和处理速度,选择合适的任务队列可以提高线程池的处理效率。例如,如果处理速度快,任务数量多,可以使用 LinkedBlockingQueue;如果处理速度慢,任务数量少,可以使用 ArrayBlockingQueue。
  3. 选择合适的拒绝策略:当任务队列满时,需要有合理的拒绝策略。例如,可以选择抛出异常来告知调用者,或者将任务交给一个备用线程池处理。
  4. 监控和调优:通过监控线程池的运行状态,例如队列长度、活动线程数、任务完成数等,可以及时发现问题并进行调优。

8.2. 书籍和资源推荐

  1. 《Java 并发编程实战》:这本书是由 Brian Goetz 和其他多位并发编程专家共同编写的,全面而深入地介绍了 Java 并发编程的各种概念和技术,包括线程安全性、同步、线程池、非阻塞算法等。
  2. 《Java 并发编程的艺术》:作者是多线程并发编程方面的资深专家方勇,本书深入浅出地介绍了 Java 多线程编程的原理和实践,包括线程的创建和销毁、同步机制、线程池、阻塞队列、并发容器、锁优化等。
  3. Oracle 官方文档:Java 平台的官方文档包含了丰富的并发和多线程编程相关内容,包括 java.util.concurrent 包中的类和接口的详细说明。Java 并发编程(Concurrency)是非常好的入门和参考资料。
  4. Doug Lea 的并发编程网站:Doug Lea 是 Java 并发编程的权威,他的 并发编程网站收集了大量的并发编程资源,包括文章、演讲、研究论文等,非常值得深入研读。