java并发编程9-JDK线程池

JDK的线程池

线程池状态,RUNNING,SHUTDOWN(不会再接受新任务了),STOP(立刻停止),TIDYING(任务执行完毕,即将TERMINATED),TERMINATED

构造函数

1
public ThreadPollExecutor(int corePoolsize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • 核心线程数量
  • 最大线程数量
  • 就急线程生存时间
  • 时间单位
  • 阻塞队列
  • 线程工厂: 给线程起个名字
  • 拒绝策略

拒绝策略

  • AbortPolicy 让调用者抛出异常
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DIcardOldestPolicy 放弃队列中最先进入的任务
  • Dubbo 抛出异常并记录线程栈信息
  • Netty 创建新的线程来执行
  • ActiveMQ 等待超时
  • PinPoint 拒绝策略链, 比如先用方法A,如果失败就要方法B,...

newFixedThreadPool

固定大小的线程池 阻塞队列无界,没有就急线程,nThreads个核心线程, 是非守护线程 当然我们也可以自己创建线程工厂,自己给线程取名字

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newCachedThraedPool

不固定大小的线程池 阻塞队列无界,没有核心线程,全是救急线程,但不是无限个,活60秒

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

SynchronousQueue

如果没有人取出东西,放入操作会被阻塞, 如果没有人放入东西,同理拿出会被阻塞,如果有多个同时拿,这时候就像栈一样,后来的人,会先拿到东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void test10_f1(SynchronousQueue<Integer> integers, String string) throws InterruptedException {
Thread.sleep(200);
new Thread(() -> {
try {
logger.debug("begin");
integers.put(1);
logger.debug("end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, string).start();
}

void test10_f2(SynchronousQueue<Integer> integers, String string) throws InterruptedException {
Thread.sleep(200);
new Thread(() -> {
try {
logger.debug("begin");
integers.take();
logger.debug("end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, string).start();
}

@Test
public void test10() throws InterruptedException {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();
test10_f1(integers, "1");
test10_f1(integers, "2");
test10_f1(integers, "3");
test10_f1(integers, "4");
test10_f2(integers, "5");
test10_f2(integers, "6");
test10_f2(integers, "7");
test10_f2(integers, "8");
test10_f2(integers, "a");
test10_f2(integers, "b");
test10_f2(integers, "c");
test10_f2(integers, "d");
test10_f1(integers, "e");
test10_f1(integers, "f");
test10_f1(integers, "g");
test10_f1(integers, "h");
}
下面是输出, 可以看到,1234按顺序进入,4321按顺序出来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
16:33:54.391 [1] DEBUG com.wsx.test.ThreadTest - begin
16:33:54.591 [2] DEBUG com.wsx.test.ThreadTest - begin
16:33:54.792 [3] DEBUG com.wsx.test.ThreadTest - begin
16:33:54.996 [4] DEBUG com.wsx.test.ThreadTest - begin
16:33:55.202 [5] DEBUG com.wsx.test.ThreadTest - begin
16:33:55.202 [5] DEBUG com.wsx.test.ThreadTest - end
16:33:55.202 [4] DEBUG com.wsx.test.ThreadTest - end
16:33:55.407 [6] DEBUG com.wsx.test.ThreadTest - begin
16:33:55.409 [6] DEBUG com.wsx.test.ThreadTest - end
16:33:55.409 [3] DEBUG com.wsx.test.ThreadTest - end
16:33:55.609 [7] DEBUG com.wsx.test.ThreadTest - begin
16:33:55.609 [2] DEBUG com.wsx.test.ThreadTest - end
16:33:55.609 [7] DEBUG com.wsx.test.ThreadTest - end
16:33:55.813 [8] DEBUG com.wsx.test.ThreadTest - begin
16:33:55.814 [8] DEBUG com.wsx.test.ThreadTest - end
16:33:55.814 [1] DEBUG com.wsx.test.ThreadTest - end
16:33:56.017 [a] DEBUG com.wsx.test.ThreadTest - begin
16:33:56.221 [b] DEBUG com.wsx.test.ThreadTest - begin
16:33:56.425 [c] DEBUG com.wsx.test.ThreadTest - begin
16:33:56.630 [d] DEBUG com.wsx.test.ThreadTest - begin
16:33:56.835 [e] DEBUG com.wsx.test.ThreadTest - begin
16:33:56.836 [e] DEBUG com.wsx.test.ThreadTest - end
16:33:56.836 [d] DEBUG com.wsx.test.ThreadTest - end
16:33:57.038 [f] DEBUG com.wsx.test.ThreadTest - begin
16:33:57.039 [f] DEBUG com.wsx.test.ThreadTest - end
16:33:57.039 [c] DEBUG com.wsx.test.ThreadTest - end
16:33:57.244 [g] DEBUG com.wsx.test.ThreadTest - begin
16:33:57.244 [g] DEBUG com.wsx.test.ThreadTest - end
16:33:57.244 [b] DEBUG com.wsx.test.ThreadTest - end
16:33:57.448 [h] DEBUG com.wsx.test.ThreadTest - begin
16:33:57.449 [h] DEBUG com.wsx.test.ThreadTest - end
16:33:57.449 [a] DEBUG com.wsx.test.ThreadTest - end

newSingleThreadExecutor

1个核心线程,0个救急线程,使用无界队列,于是任务可以无数个

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
1个线程的线程池能叫池吗?我干嘛不自己用? 实际上我们自己创建的话如果碰到一些错误的任务,可能线程就退出了,这里不好处理,但是线程池在该线程退出以后会帮我们重新创建线程 FinalizableDelegatedExecutorService 是一个装饰者模式,只暴露部分接口,避免后期被修改容量

TimerTask

这个不重要,他很差,他是串行执行的,如果前面的太耗时会导致后面的被推迟,如果前面发生异常,后面的不会执行

ScheduledExecutorService

用法和TimerTask很像,但是他不会出现上面的异常影响后续任务的情况 ### ScheduledExecutorService.scheduleAtFixedTate() 在初始延迟以后,能够在单位时间内被反复执行 ### ScheduledExecutorService.scheduleWithFixedDelay() 在初始延迟以后,反复执行的两个任务之间隔固定时间

函数

submit

用future来返回,future.get();

invokeAll(tasks)

提交tasks中的所有任务

invokeAll(tasks,timeout,unit)

带一个超时时间

invokeAny

谁最先执行完就返回谁,其他的就不管了

shutdown

无阻塞,不会接受新提交的任务,但已提交的任务后执行完。

shutdownNow

打断所有的线程,并返回队列中的任务,

isShutdown

只要不是running, 就返回true

isTerminated

只有TREMINATED返回真

awaitTermination

就一直等,等到超时或者线程结束

正确处理异常

如果执行过程中没有异常,future.get()正常返回,如果出现异常,future.get()会抛出异常

Fork/Join

fork能创建新的线程来执行,join会阻塞,这就实现了并行,下面是100的阶乘模998244353

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Logger logger = LoggerFactory.getLogger(RecursiveTaskTest.class);

@Test
public void test() {
class Task extends RecursiveTask<Integer> {
private int begin;
private int end;
private int mod;

Task(int begin, int end, int mod) {
this.begin = begin;
this.end = end;
this.mod = mod;
}

@Override
protected Integer compute() {
if (begin == end) return begin;
int mid = begin + end >> 1;
Task task1 = new Task(begin, mid, mod);
Task task2 = new Task(mid + 1, end, mod);
task1.fork();
task2.fork();
return Math.toIntExact(1L * task1.join() * task2.join() % mod);
}
}

ForkJoinPool forkJoinPool = new ForkJoinPool(3);
logger.debug("{}", forkJoinPool.invoke(new Task(1, 100, 998244353)));

}