0%

线程池————建服务器应用程序的有效方法

简介

当需要频繁地创建多个线程进行耗时操作,每次都通过new Thread的方式去创建显然不是一种好的方式,因为每次new Thread新建和销毁对象性能较差,线程缺乏统一地管理,可能线程之间会竞争,可能会占用过多系统资源导致死锁,并且缺乏定时执行、线程中断等功能。

Java提供了 4 种线程池,它能够有效地管理、调度线程,避免过多的资源消耗。

它的优点如下:

  • 重用存在的线程,减少对象的创建、销毁的开销;
  • 可有效地控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免阻塞;
  • 可以提供定时执行、定期执行、单线程、并发数控制等功能;

线程池都实现了ExecutorService接口,该接口定义了线程池需要实现的接口,比如submitexecuteshutdown等方法。它的实现有ThreadPoolExecutor以及ScheduledThreadPoolExecutor等等。JDK 还提供了一个Executors的工厂类来简化创建线程池的流程。

组成部分

一个线程包括以下四个基本组成部分:

  • 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池销毁线程池添加新任务等;
  • 工作线程(WorkThread):线程池中线程,在没有任务时是处于等待状态,可以循环地执行任务;
  • 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。它规定了任务的入口,任务执行完成后的收尾工作,任务的执行状态等等;
  • 任务队列(TaskQueue):用于存放没有处理的任务,提供一个缓冲机制。

设置线程数

  1. 高并发、任务执行时间短的业务怎么使用线程池

    线程池线程数可以设置为 CPU 核心数 + 1,减少线程上下文的切换

  2. 并发不高、任务执行时间长的业务怎样使用线程池

    • 如果业务时间是集中在 IO 操作上,就是 IO 密集型的任务。因为 IO 操作并不占用 CPU,所以不要让所有的 CPU 闲下来,可以适当加大线程池中的线程数目(2 * CPU 核心数),让 CPU 可以处理更多的业务
    • 如果是业务时间集中在计算操作上,那么就是 CPU 密集型的任务。所以同样设置为 CPU 核心数 + 1,减少线程上下文的切换即可。
  3. 并发高、业务执行时间长的业务怎样使用线程池

    如果既是并发程度高,又是业务执行时间长的话,解决的关键在于整体架构的设计而不是线程池。

启动指定数量的线程——ThreadPoolExecutor

ThreadPoolExecutor 是线程池的主要实现之一,功能是启动指定数量的线程以及将任务添加到一个队列中,并且将任务分发给空闲的线程。Executors的生命周期包括 3 种状态:运行、空闲、终止。

ThreadPoolExecutor 的构造函数如下:

1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

workingQueue

workingQueue 有下列几个常用的实现:

ArrayBlockingQueue:基于数组结构实现的有界队列,按照 FIFO 原则对任务进行排序。如果队列满了还有任务进来,就调用拒绝策略。 LinkedBlockingQueue:基于链表结构实现的无界队列,按照 FIFO 原则对任务进行排序。因为是无界,所以肯定不会满,所以采用此队列会忽略拒绝策略。 SynchronousQueue:直接将任务提交给线程而不是将它加入到该队列中,所以实际上这个队列是空的。当新任务来了,但是线程池中不存在任何可以被调用的线程时,就会调用拒绝策略。 PriorityBlockingQueue:具有优先级的有界队列,可以自定义优先级。

当线程池与工作队列都满了的情况,对于新添加任务也有一些默认实现的处理策略,例如拒绝策略等。

  • AbortPolicy:拒绝任务,会抛出异常,是线程池默认的策略
  • CallerRunsPolicy:拒绝新任务进入,如果该线程池还没有被关闭,那么新任务执行在调用线程中。
  • DiscardOldestPolicy:如果执行程序尚未关闭,那么工作队列头部的任务将被删除,然后重试执行程序(如果失败,继续反复重试)。这样的结果会导致最后加入的任务反而可能被执行到,而先前加入的任务都被抛弃了。
  • DiscardPolicy:加不进的任务都会被抛弃,不会抛出异常。

可缓存线程池——CachedThreadPool

任务线程传入时自动分配线程,线程不够时自动创建新的线程

  • 线程数无限制
  • 有空闲线程就复用空闲线程,若无空闲线程则新建线程,一定程度上减少频繁创建/销毁线程,减少系统开销

固定长度的线程池——FixedThreadPool

指定线程池线程的个数,任务线程传入时自动分配线程,线程不够时,剩余的任务线程排队等待线程池中的线程执行完毕

  • 可控制线程最大的并发数(即同时执行的线程数)
  • 超出的线程会在队列中等待

定时执行一些任务——ScheduledThreadPoolExecutor

指定线程池中线程的个数,任务线程传入时自动分配线程,可以设定任务线程第一次执行时的延时时间和之后每次执行的间隔时间。

当需要定时地执行一些任务时,就可以通过ScheduledThreadPoolExecutor来实现,只需要通过Executorsnew ScheduledThreadPool函数就可以创建定时执行任务的线程池。

  • 不要对那些同步等待其他任务结果的任务排队,否则可能导致死锁。在死锁中,所有线程都被一些任务所占用,而这些任务依次排队等待,又无法执行,这样所有的线程都属于忙碌状态。
  • 理解任务————要有效地调整线程池的大小。
  • 避免线程太少或者线程太多。

单线程化的线程池——SingleThreadExecutor

  • 有且仅有一个工作线程在执行任务
  • 所有任务按照指定的顺序执行,即遵循队列的先入队出队的原则

有效的方法——BlockingQueue

阻塞队列提供了一系列有用的特性。例如,当队列满了时再调用put函数添加元素时,调用线程将被堵塞,直到队列不再是填满状态为止。

BlockingQueue 中重要的方法有:

  • add(e)
  • offer(e)
  • offer(e,time,unit)
  • put(e)
  • take()
  • poll(time,unit)
  • element()
  • peek()
  • remove()

BlockingQueue 主要有三种:

  • 基于数组的先进先出队列,有界:ArrayBlockingQueue
  • 基于链表的先进先出队列,无界:LinkedBlockingQueue
  • 无缓冲的等待队列,无界:SynchronousQueue

ArrayBlockingQueue

之前已经介绍了,ArrayBlockingQueue 是一个基于数组结构实现的有界环形队列,按照 FIFO 原则对任务进行排序。如果队列满了还有任务进来,就调用拒绝策略。其原理就是利用了 Lock 锁的 Condition 通知机制进行阻塞控制。

那么,它主要使用ReentrantLock类中有一个Condition,它用于实现线程间的通信,是为了解决Object.wait()notify()notifyAll()难以使用的问题。

  • await():线程等待的方法
  • await(int time, TimeUnit unit):线程等待特定的时间,超过时间则为超时
  • signal():随机唤醒某个等待线程
  • signalAll():唤醒所有等待中的线程

当 MyArrayBlockingQueue 的元素为最大容量时,如果再往该队列中添加元素,就会调用notFull.await()函数使得线程阻塞,直到其它线程调用了take()方法从该队列中取出了元素,才不是已满状态。可以使用notFull.signalAll()方法唤醒所有的等待线程,使得添加元素的操作得以进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public class MyArrayBlockingQueue<T> {
/**
* 数据数组
*/
private final T[] items;
/**
* 锁
*/
private final Lock lock = new ReentrantLock();
/**
* 队满的条件
*/
private Condition notFull = lock.newCondition();
/**
* 队空的条件
*/
private Condition notEmpty = lock.newCondition();
/**
* 队列的头部索引
*/
private int head;
/**
* 队列的尾部索引
*/
private int tail;
/**
* 数据的个数
*/
private int count;

public static void main(String[] args) {
MyArrayBlockingQueue<Integer> aQueue = new MyArrayBlockingQueue<>();
aQueue.put(3);
aQueue.put(24);
for (int i = 0; i < 5; i++) {
System.out.println(aQueue.take());
}
}


public MyArrayBlockingQueue(int maxSize) {
items = (T[]) new Object[maxSize];
}

public MyArrayBlockingQueue() {
this(10);
}

public void put(T t) {
lock.lock();
try {
while (count == getCapacity()) {
System.out.println("数据已满,等待");
notFull.await();
}
items[tail] = t;
if (++tail == getCapacity()) {
tail = 0;
}
++count;
// 唤醒等待数据的线程
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public T take() {
lock.lock();
try {
while (count == 0) {
System.out.println("还没有数据,请等待");
notEmpty.await();
}
T ret = items[head];
items[head] = null;
if (++head == getCapacity()) {
head = 0;
}
--count;
// 唤醒添加数据的线程
notFull.signalAll();
return ret;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return null;
}

public int getCapacity() {
return items.length;
}

public int size() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}

put 方法

  1. 拿到了线程竞争 lock 锁,拿到了 lock 锁的线程就进入下一步,没有拿到 lock 锁的线程就自旋竞争锁;
  2. 判断阻塞队列是否已满,如果满了就调用await()方法阻塞这个线程,notFull(生产者)挂起,最后释放 lock 锁,等待被消费者线程唤醒;
  3. 如果没有满,则调用enqueue()方法将元素 put 进阻塞队列;
  4. 唤醒一个标记为notEmpty()(消费者)的线程。

take 方法

  1. 拿到了线程竞争 lock 锁,拿到了 lock 锁的线程就进入下一步,没有拿到 lock 锁的线程就自旋竞争锁;
  2. 判断阻塞队列是否为空,如果为空就调用await()方法阻塞这个线程,notEmpty(生产者)挂起,最后释放 lock 锁,等待被生产者线程唤醒;
  3. 如果没有空,则调用dequeue()方法将元素 take 出阻塞队列;
  4. 唤醒一个标记为notFull()(消费者)的线程。

LinkedBlockingQueue

LinkedBlockingQueue 是基于链表的阻塞队列,其内部维持着一个数据缓冲队列(该队列由一个链表组成),当生产者往队列中放入一个数据时,队列会从生产者中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者队列消费了数据,生产者才能继续工作。反之消费者端也是同样的。

LinkedBlockingQueue 之所以能够高效处理并发数据,是因为对生产者端和消费者端都分别采用了独立的锁来控制数据同步,所以在高并发的情况下,生产者和消费者可以并行地操作队列中的数据,以此提高整个队列的并发性能。

DelayQueue

DelayQueue 中的元素只有当自己指定的延迟时间到了,才可以从队列中获取到该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据是不会被阻塞的,只有获取数据会被阻塞。

PriorityBlockingQueue

PriorityBlockingQueue 是基于优先级的阻塞队列(优先级的判断是通过构造函数传入的Compator对象来决定的)。PriorityBlockingQueue 不会阻塞生产者,只会当没有可消费的数据时,才会阻塞消费者。所以,生产者生产数据的速度绝对不能快于消费者数据的速度,否则时间长了后会耗尽所有的可用的堆内存空间(因为不会阻塞生产者)。

实现 PriorityBlockingQueue 时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue

SynchronousQueue 是一种无缓冲的等待队列,类似于无中介的直接交易,生产者直接与消费者交易,少了一个中间经销商的环节(缓冲区)。

声明一个 SynchronousQueue 有两种不同的方式:公平模式和非公平模式。

公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞多余的生产者和消费者,从而体系整体的公平策略。

非公平模式:SynchronousQueue 会采用非公平锁,同时配合一个 LIFO 的队列来管理多余的生产者和消费者。

四种拒绝策略

AbortPolicy

丢弃任务并抛出 RejectedExecutionException 异常

1
2
3
4
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task" + r.toString()
+ " rejected from " + e.toString());
}

DiscardPolicy

丢弃任务,但是并不抛出异常

1
2
3
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

}

DisCardOldSetPolicy

丢弃队列最前面的任务,然后提交新来的任务

1
2
3
4
5
6
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.executor();
}
}

CallerRunPolicy

由调用线程(提交任务的线程,主线程)处理该任务

1
2
3
4
5
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}