image-20240309202706043

在JAVA中,当我们启动main函数时,实际上会启动一个JVM进程,main函数所在的线程就是这个进程的主线程

并发编程的三个重要特性:原子性、可见性、有序性

  • 原子性:使用sychronized、各种lock保证原子性

  • 可见性:当一个线程对共享变量修改后,另外的线程都是可以立即看到修改后的最新值。使用sychronizedvolatilelock实现可见性。

  • 有序性:指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致,所以在多线程下,指令重排序可能会导致一些问题,我们可以使用volatile关键字来禁止指令重排序。

多线程

进程和线程的区别

image-20240309204137587

image-20240309204253422

image-20240309204402331

并发和并行

image-20240309204819989

创建线程的方式

image-20240309205219858 image-20240309205830743 image-20240309205943877
public class test {
public static void main(String[] args) {
//匿名内部类,在Thread()中传入实现了Rannable接口的实例
new Thread(()->{
System.out.println("hello");
}).start();
}
}
image-20240309210310343 image-20240309210624669

Runnable和Callable有什么区别

image-20240309210918913

run() 和 start()的区别

image-20240309211337601

线程的状态及转化

image-20240309211842592

image-20240309212837104

image-20240309212920842

如何控制线程的运行顺序 join()

image-20240309213722393

notify()和notifyall()

image-20240309214010249

sleep()和wait() ⭐

image-20240310140600006

停止线程

image-20240310142855985
//使用标志退出
public class MyInterrupt1 extends Thread {

volatile boolean flag = false ; // 线程执行的退出标记

@Override
public void run() {
while(!flag) {
System.out.println("MyThread...run...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {

// 创建MyThread对象
MyInterrupt1 t1 = new MyInterrupt1() ;
t1.start();

// 主线程休眠6秒
Thread.sleep(6000);

// 更改标记为true
t1.flag = true ;

}
}
//使用interrupt方法中断线程
package com.itheima.basic;

public class MyInterrupt3 {

public static void main(String[] args) throws InterruptedException {

//1.打断阻塞的线程
/*Thread t1 = new Thread(()->{
System.out.println("t1 正在运行...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t1.start();
Thread.sleep(500);
t1.interrupt();
System.out.println(t1.isInterrupted());*/


//2.打断正常的线程
Thread t2 = new Thread(()->{
while(true) {
Thread current = Thread.currentThread();
boolean interrupted = current.isInterrupted();
if(interrupted) {
System.out.println("打断状态:"+interrupted);
break;
}
}
}, "t2");
t2.start();
Thread.sleep(500);
// t2.interrupt();

}
}

synchronized底层原理

基本使用

注意:sychronized只能满足单个jvm下的锁,多个jvm下需要分布式锁,如同一个服务,但是做了集群,他们就位于不同的jvm中

Synchronized【对象锁】采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住

image-20240310143751088

使用方式

  • 修饰实例方法(锁当前对象实例)
    给当前对象实例加锁,进入同步代码前要先获得当前对象的实例锁

  • 修饰静态方法(锁当前类)
    给当前类加锁,会作用于类的所有实例对象,进入同步代码前要先获得当前class的锁,因为静态成员变量不属于任何一个实例,而是属于整个类,被类的所用实例共享。

    静态 synchronized 方法和非静态 synchronized 方法之间的调用互斥么?不互斥!如果一个线程 A 调用一个实例对象的非静态 synchronized 方法,而线程 B 需要调用这个实例对象所属类的静态 synchronized 方法,是允许的,不会发生互斥现象,因为访问静态 synchronized 方法占用的锁是当前类的锁,而访问非静态 synchronized 方法占用的锁是当前实例对象锁。

  • 修饰代码块(锁指定对象/类)
    对括号里的 类/对象 加锁

    synchronized(object) 表示进入同步代码库前要获得 给定对象的锁

    synchronized(类.class) 表示进入同步代码前要获得 给定 Class 的锁

  • 构造方法能用 synchronized 修饰吗?
    不能,但构造方法内可以使用 synchronized 代码块另外,构造方法本身是线程安全的,但如果在构造方法中涉及到共享资源的操作,就需要采取适当的同步措施来保证整个构造过程的线程安全

底层原理

synchronized 关键字底层原理属于 JVM 层面的东西。

Monitor 被翻译为监视器,由jvm提供,c++语言实现

修饰同步代码块的情况

public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("synchronized 代码块");
}
}
}

以下为上述代码字节码文件的一部分,可以看到,包含两个monitorexit,这是为了保证同步代码块正常执行以及出现异常时,同步锁可以正常释放。

当执行monitorenter指令时,线程会试图获取锁,也就是获取对象监视器monitor的特有权。

image-20240503193419346

在 Java 虚拟机(HotSpot)中,Monitor 是基于 C++实现的,由ObjectMonitor实现的。每个对象中都内置了一个 ObjectMonitor对象。

另外,wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。

image-20240310144605392

具体的流程:

  • 代码进入synchorized代码块,先让lock(对象锁)关联monitor,然后判断Owner是否有线程持有

  • 如果没有线程持有,则让当前线程持有,表示该线程获取锁成功

  • 如果有线程持有,则让当前线程进入entryList进行阻塞,如果Owner持有的线程已经释放了锁,在EntryList中的线程去竞争锁的持有权==(非公平)==

  • 如果代码块中调用了wait()方法,则会进去WaitSet中进行等待

参考回答:

  • Synchronized【对象锁】采用互斥的方式让同一时刻至多只有一个线程能持有【对象锁】

  • 它的底层由monitor实现的,monitor是jvm级别的对象( C++实现),线程获得锁需要使用对象(锁)关联monitor

  • 在monitor内部有三个属性,分别是owner、entrylist、waitset

  • 其中owner是关联的获得锁的线程,并且只能关联一个线程;entrylist关联的是处于阻塞状态的线程;waitset关联的是处于Waiting状态的线程

修饰方法的情况

public class SynchronizedDemo2 {
public synchronized void method() {
System.out.println("synchronized 方法");
}
}
image-20240606183136051

synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。JVM 通过该 ACC_SYNCHRONIZED 访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。

如果是实例方法,JVM 会尝试获取实例对象的锁。如果是静态方法,JVM 会尝试获取当前 class 的锁

总结

  • synchronized 同步语句块的实现使用的是 monitorentermonitorexit 指令,其中 monitorenter 指令指向同步代码块的开始位置,monitorexit 指令则指明同步代码块的结束位置。

  • synchronized 修饰的方法并没有 monitorenter 指令和 monitorexit 指令,取得代之的确实是 ACC_SYNCHRONIZED 标识,该标识指明了该方法是一个同步方法。

锁升级

首先,锁的状态有4中:无锁状态偏向锁轻量级锁重量级锁。这些都记录在锁对象的Mark Word中。随着锁竞争的激烈程度上升,锁会升级,但不会降级。

  • 偏向锁
    偏向于第一个请求锁的线程。

    • 如果在运行过程中,同步锁只有一个线程请求,不存在锁竞争的情况,则会给该线程加一个偏向锁(将线程id记录在锁对象mark word中),当该线程下次执行同步代码块时,会判断当前持有锁的线程是否是自己。如果自始至终都没有锁竞争,那么偏向锁不会有额外的开销,效率很高。
    • 如果运行过程中发生了锁竞争,则持有偏向锁的线程会被挂起,JVM会消除他的偏向锁,将锁升级为轻量级锁,撤销轻量级锁时会导致STW。
  • 轻量级锁(自旋锁)

    • 升级为轻量级锁后,竞争失败的线程会导致锁进入锁膨胀状态,会让竞争失败的线程自旋,自旋会导致忙等问题,当自旋达到一定次数后,轻量级锁升级为重量级锁。
    • 自旋的好处是,减少了线程状态切换带来的开销,缺点是可能会占用CPU资源。
  • 重量级锁

    • 当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则自己会直接进入阻塞状态。
image-20240606185236557

volatile

保证线程间的可见性

如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取

image-20240310154147808

image-20240310154540702

其实是加了volatile后,每次读写这个变量,都要到共享内存中读取

禁止指令重排序

image-20240310154929359

image-20240310155651807

image-20240310155852881

双重校验锁 实现对象单例(线程安全)

public class Singleton {

private volatile static Singleton uniqueInstance;
//私有化构造函数,不能 new 对象
private Singleton() {
}

public static Singleton getUniqueInstance() {
//先判断对象是否已经实例过,没有实例化过才进入加锁代码
if (uniqueInstance == null) {
synchronized (Singleton.class) {
if (uniqueInstance == null) {
uniqueInstance = new Singleton();
}
}
}
return uniqueInstance;
}
}

uniqueInstance 采用 volatile 关键字修饰也是很有必要的, uniqueInstance = new Singleton(); 这段代码其实是分为三步执行:

  1. uniqueInstance 分配内存空间

  2. 初始化 uniqueInstance

  3. uniqueInstance 指向分配的内存地址

但是由于 JVM 具有指令重排的特性,执行顺序有可能变成 1->3->2。指令重排在单线程环境下不会出现问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。例如,线程 T1 执行了 1 和 3,此时 T2 调用 getUniqueInstance() 后发现 uniqueInstance 不为空,因此返回 uniqueInstance,但此时 uniqueInstance 还未被初始化。

**volatile 关键字能保证变量的可见性,但不能保证对变量的操作是原子性的。**通常,对一个变量的修改包括三部:读取变量值,修改变量值,将修改后的值保存

synchronized和volatile

synchronized 关键字和 volatile 关键字是两个互补的存在,而不是对立的存在!

  • volatile 关键字是线程同步的轻量级实现,所以 volatile性能肯定比synchronized关键字要好 。但是 volatile 关键字只能用于变量而 synchronized 关键字可以修饰方法以及代码块 。

  • volatile 关键字能保证数据的可见性,但不能保证数据的原子性。synchronized 关键字两者都能保证。

  • volatile关键字主要用于解决变量在多个线程之间的可见性,和防止指令重排,而 synchronized 关键字解决的是多个线程之间访问资源的同步性

JMM(Java内存模型)

JMM 定义了共享内存中多线程程序读写操作的行为规范。

image-20240310151130003 image-20240310151147324

什么是主内存?什么是本地内存?

  • 主内存:所有线程创建的实例对象都存放在主内存中,不管该实例对象是成员变量,还是局部变量,类信息、常量、静态变量都是放在主内存中。为了获取更好的运行速度,虚拟机及硬件系统可能会让工作内存优先存储于寄存器和高速缓存中。

  • 本地内存:每个线程都有一个私有的本地内存,本地内存存储了该线程以读 / 写==共享变量的副本==。**每个线程只能操作自己本地内存中的变量,无法直接访问其他线程的本地内存。如果线程间需要通信,必须通过主内存来进行。**本地内存是 JMM 抽象出来的一个概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。

JMM为共享变量提供了可见性的保障。

主内存存放共享变量,工作内存存放共享变量副本。

image-20240310153143150

悲观锁

  • 悲观锁的基本思想是认为数据很可能会发生冲突,因此在访问数据之前先获取锁。在使用悲观锁的情况下,线程在访问共享资源之前会先尝试获取锁,如果获取不到锁就会进入阻塞状态,直到获取到锁为止。

  • 悲观锁常常使用 synchronized 关键字或者是显式锁(如 ReentrantLock)来实现,它可以确保在同一时刻只有一个线程能够访问共享资源,从而保证数据的一致性。

AQS(公平的)

AbstractQueueSynchronizer ,抽象队列同步器,是构建锁或其他同步组件的基础框架

AQS 就是一个抽象类,主要用来构建锁和同步器。

常见的实现类:ReentrantLock、Semaphore、CountDownLatch

image-20240606203510042 image-20240310162708708

ReentrantLock

基于JDK实现的锁,与synchronized相比具有以下特点:

  • 可中断

  • 可重入(synchronized也可重入)

  • 可设置公平锁

  • 可设置超时时间

  • 支持多个条件变量

可中断锁:获取锁的过程中可以被中断,不需要一直等到获取锁之后才能进行其他逻辑处理。ReentrantLock 就属于是可中断锁。

通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。

不可中断锁:一旦线程申请了锁,就只能等到拿到锁以后才能进行其他的逻辑处理。 synchronized 就属于是不可中断锁。

可重入锁 也叫递归锁,指的是同一个线程可以再次获取自己的内部锁。比如一个线程获得了某个对象的锁,此时这个对象锁还没有释放,当其再次想要获取这个对象的锁的时候还是可以获取的,如果是不可重入锁的话,就会造成死锁。

底层原理

AQS+CAS

image-20240310164627977

image-20240310165253810

image-20240310165419183

synchronized 与 Lock的区别

语法层面

  • synchronized 是关键字,由 jvm 实现,用cpp实现

  • Lock 是接口,由 JDK 提供,用 java 语言实现

  • synchornized 退出同步块会自动释放锁,Lock 要手动 unlock

功能层面

  • 都是悲观锁,具有基本的互斥、同步、重入功能

  • Lock 可打断、可公平、可设置超时时间、多条件变量

  • Lock 由适合不同场景的实现,如 ReentrantLock、ReentrantReadWriteLock

性能层面:

  • 没有竞争时,synchronized 做了很多优化,如偏向锁、轻量级锁,性能不错

  • 竞争激烈时,Lock 性能更好

Semaphore

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量

// 初始共享资源数量
final Semaphore semaphore = new Semaphore(5);
// 获取1个许可
semaphore.acquire();
// 释放1个许可
semaphore.release();

Semaphore 通常用于那些资源有明确访问数量限制的场景比如限流(仅限于单机模式,实际项目中推荐使用 Redis +Lua 来做限流)。

原理

Semaphore 是共享锁的一种实现,它默认构造 AQS 的 state 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

调用semaphore.acquire() ,线程尝试获取许可证,如果 state >= 0 的话,则表示可以获取成功。如果获取成功的话,使用 CAS 操作去修改 state 的值 state=state-1。如果 state<0 的话,则表示许可证数量不足。此时会创建一个 Node 节点加入阻塞队列,挂起当前线程。

死锁

死锁的四个必要条件

  • 互斥条件 :该资源任意一个时刻只由一个线程占用。

  • 请求与保持条件 :一个线程因请求资源而阻塞时,对已获得的资源保持不放。

  • **不剥夺条件 **: 线程已获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完毕后才释放资源。

  • **循环等待条件 **: 若干线程之间形成一种头尾相接的循环等待资源关系。

public class DeadLockDemo {
private static Object resource1 = new Object();//资源 1
private static Object resource2 = new Object();//资源 2

public static void main(String[] args) {
new Thread(() -> {
synchronized (resource1) {
System.out.println(Thread.currentThread() + "get resource1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "waiting get resource2");
synchronized (resource2) {
System.out.println(Thread.currentThread() + "get resource2");
}
}
}, "线程 1").start();

new Thread(() -> {
synchronized (resource2) {
System.out.println(Thread.currentThread() + "get resource2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "waiting get resource1");
synchronized (resource1) {
System.out.println(Thread.currentThread() + "get resource1");
}
}
}, "线程 2").start();
}

}

诊断死锁

image-20240310185626352 image-20240310185932837 image-20240310185656573

乐观锁

  • 乐观锁的基本思想是认为数据在一般情况下不会发生冲突,因此在访问数据时不加锁,而是在更新数据时检查是否发生了冲突。如果发现冲突,会进行相应的处理(通常是回滚操作),然后重新尝试。

  • 乐观锁的典型实现是版本号机制CAS算法(是一个原子操作)

    • 在 Java 中java.util.concurrent.atomic包下面的原子变量类(比如AtomicIntegerLongAdder)就是使用了乐观锁的一种实现方式 CAS 实现的。

悲观锁乐观锁如何选择

悲观锁适用于并发写入较多的场景,能够确保数据的一致性;而乐观锁适用于并发读取较多、写入较少的场景,可以提高系统的并发性能。

高并发的场景下,乐观锁相比悲观锁来说,不存在锁竞争造成线程阻塞,也不会有死锁的问题,在性能上往往会更胜一筹。但是,如果冲突频繁发生(写占比非常多的情况),会频繁失败和重试(黑马点评,超卖问题的解决),这样同样会非常影响性能,导致 CPU 飙升。

悲观锁通常多用于写比较多的情况(多写场景,竞争激烈),这样可以避免频繁失败和重试影响性能,悲观锁的开销是固定的。不过,如果乐观锁解决了频繁失败和重试这个问题的话(比如LongAdder),也是可以考虑使用乐观锁的,要视实际情况而定。

乐观锁通常多用于写比较少的情况(多读场景,竞争较少),这样可以避免频繁加锁影响性能。不过,乐观锁主要针对的对象是单个共享变量(参考java.util.concurrent.atomic包下面的原子变量类)

CAS(Compare And Swap)

image-20240310152254760 image-20240310153314915

CAS 的全称是 Compare And Swap(比较与交换) ,用于实现乐观锁,被广泛应用于各大框架中。CAS 的思想很简单,就是用一个预期值和要更新的变量值进行比较,两值相等才会进行更新。

CAS 是一个原子操作,底层依赖于一条 CPU 的原子指令。

原子操作 即最小不可拆分的操作,也就是说操作一旦开始,就不能被打断,直到操作完成。

CAS 涉及到三个操作数:

  • V:要更新的变量值(Var)

  • E:预期值(Expected)

  • N:拟写入的新值(New)

当且仅当 V 的值等于 E 时,CAS 通过原子方式用新值 N 来更新 V 的值。如果不等,说明已经有其它线程更新了 V,则当前线程放弃更新。

CAS存在的问题

  • ABA问题,利用版本号机制可以解决

  • 自旋问题,循环时间长开销大:当操作不成功的时候会一直自旋,直到操作成功,会占用过多CPU资源

  • 只能保证一个共享变量的原子操作:只对单个共享变量有效,当涉及到多个共享变量CAS无效

多线程的执行安全

要确保三大特性:原子性、可见性、有序性

原子性

一个线程在CPU中操作不可暂停,也不可中断,要不执行完成,要不不执行

实现方案: 加锁

  1. synchronized:同步加锁

  2. JUC里面的lock:加锁

image-20240310193152560

可见性

让一个线程对共享变量的修改对另一个线程可见

image-20240310193255054

有序性

指令重排:处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行先后顺序同代码中的顺序一致,但是它会保证程序最终执行结果和代码顺序执行的结果是一致的

要避免指令重排:volatile

线程池

线程池参数、原理 ⭐

什么是线程池?

顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,**处理完之后线程并不会立即被销毁,而是等待下一个任务。**池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

image-20240310193949527

image-20240310194453585

1,任务在提交的时候,首先判断核心线程数是否已满,如果没有满则直接添加到工作线程执行

2,如果核心线程数满了,则判断阻塞队列是否已满,如果没有满,当前任务存入阻塞队列

3,如果阻塞队列也满了,则判断线程数是否小于最大线程数,如果满足条件,则使用临时线程执行任务

核心或临时线程执行完成任务后会检查阻塞队列中是否有需要执行的线程,如果有,则使用非核心线程执行任务

4,如果所有线程都在忙着(核心线程+临时线程),则走拒绝策略

拒绝策略

  • AbortPolicy:抛出RejectedExecutionException异常来拒绝新任务

  • CallerRunsPolicy:调用执行自己的线程运行任务,也就是在调用execute()方法的线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。该策略会降低对于新任务的提交速度,影响程序的整体性能。

  • DiscardPolicy:不处理新任务,直接丢弃掉

  • DiscardOldestPolicy:丢弃掉最早未处理的任务

线程池常用的阻塞队列

比较常见的有4个,用的最多是ArrayBlockingQueue和LinkedBlockingQueue

1.ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。

2.LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。

3.DelayedWorkQueue :延时队列,是一个优先级队列,可以实现定时任务,它可以保证每次出队的任务都是当前队列中执行时间最靠前的

4.SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。

ArrayBlockingQueue的LinkedBlockingQueue区别

LinkedBlockingQueue ArrayBlockingQueue
默认无界,支持有界 强制有界
底层是链表 底层是数组
是懒惰的,创建节点的时候添加数据 提前初始化 Node 数组
入队会生成新 Node Node需要是提前创建好的
两把锁(头尾) 一把锁

左边是LinkedBlockingQueue加锁的方式,右边是ArrayBlockingQueue加锁的方式

  • LinkedBlockingQueue读和写各有一把锁,性能相对较好

  • ArrayBlockingQueue只有一把锁,读和写公用,性能相对于LinkedBlockingQueue差一些

image-20240310200853628

如何确定核心线程数?

在设置核心线程数之前,需要先熟悉一些执行线程池执行任务的类型

  • IO密集型任务

一般来说:文件读写、DB读写、网络请求等

推荐:核心线程数大小设置为 2N+1 (N为计算机的CPU核数)

  • CPU密集型任务

一般来说:计算型代码、Bitmap转换、Gson转换等

推荐:核心线程数大小设置为 N+1 (N为计算机的CPU核数)

有一个简单并且适用面比较广的公式:

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

参考回答:

① 高并发、任务执行时间短 -->( CPU核数+1 ),减少线程上下文的切换

② 并发不高、任务执行时间长

  • IO密集型的任务 --> (CPU核数 * 2 + 1)

  • 计算密集型任务 --> ( CPU核数+1 )

③ 并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)

线程池的种类

在java.util.concurrent.Executors类中提供了大量创建连接池的静态方法,常见就有四种

newFixedThreadPool

创建使用固定线程数的线程池

源码:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
  • 核心线程数与最大线程数一样,没有救急线程

  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE

  • 适用场景:适用于任务量已知,相对耗时的任务

即使 maximumPoolSize 的值比 corePoolSize 大,也至多只会创建 corePoolSize 个线程。这是因为FixedThreadPool 使用的是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列),队列永远不会被放满。

public class FixedThreadPoolCase {

static class FixedThreadDemo implements Runnable{
@Override
public void run() {
String name = Thread.currentThread().getName();
for (int i = 0; i < 2; i++) {
System.out.println(name + ":" + i);
}
}
}

public static void main(String[] args) throws InterruptedException {
//创建一个固定大小的线程池,核心线程数和最大线程数都是3
ExecutorService executorService = Executors.newFixedThreadPool(3);

for (int i = 0; i < 5; i++) {
executorService.submit(new FixedThreadDemo());
Thread.sleep(10);
}

executorService.shutdown();
}

}

newSingleThreadExecutor

单线程化的线程池,它只会用唯一的工作线程来执行任 务,保证所有任务按照指定顺序(FIFO)执行

源码

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
  • 核心线程数和最大线程数都是1

  • 阻塞队列是LinkedBlockingQueue,最大容量为Integer.MAX_VALUE

  • 适用场景:适用于按照顺序执行的任务

SingleThreadExecutorFixedThreadPool 一样,使用的都是容量为 Integer.MAX_VALUELinkedBlockingQueue(无界队列)作为线程池的工作队列。SingleThreadExecutor 使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool 相同。说简单点,就是可能会导致 OOM

public class NewSingleThreadCase {

static int count = 0;

static class Demo implements Runnable {
@Override
public void run() {
count++;
System.out.println(Thread.currentThread().getName() + ":" + count);
}
}

public static void main(String[] args) throws InterruptedException {
//单个线程池,核心线程数和最大线程数都是1
ExecutorService exec = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {
exec.execute(new Demo());
Thread.sleep(5);
}
exec.shutdown();
}

}

newCachedThreadPool

可缓存线程池

源码

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
  • 核心线程数为0

  • 最大线程数是Integer.MAX_VALUE

  • 阻塞队列为SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作。

  • 适用场景:适合任务数比较密集,但每个任务执行时间较短的情况

CachedThreadPoolcorePoolSize 被设置为空(0),maximumPoolSize被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

public class CachedThreadPoolCase {

static class Demo implements Runnable {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
//修改睡眠时间,模拟线程执行需要花费的时间
Thread.sleep(100);

System.out.println(name + "执行完了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
//创建一个缓存的线程,没有核心线程数,最大线程数为Integer.MAX_VALUE
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
exec.execute(new Demo());
Thread.sleep(1);
}
exec.shutdown();
}

}

newScheduledThreadPool

提供了“延迟”和“周期执行”功能的ThreadPoolExecutor。

image-20230505222203615

  • 适用场景:有定时和延迟执行的任务

public class ScheduledThreadPoolCase {

static class Task implements Runnable {
@Override
public void run() {
try {
String name = Thread.currentThread().getName();

System.out.println(name + ", 开始:" + new Date());
Thread.sleep(1000);
System.out.println(name + ", 结束:" + new Date());

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
//按照周期执行的线程池,核心线程数为2,最大线程数为Integer.MAX_VALUE
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
System.out.println("程序开始:" + new Date());

/**
* schedule 提交任务到线程池中
* 第一个参数:提交的任务
* 第二个参数:任务执行的延迟时间
* 第三个参数:时间单位
*/
scheduledThreadPool.schedule(new Task(), 0, TimeUnit.SECONDS);
scheduledThreadPool.schedule(new Task(), 1, TimeUnit.SECONDS);
scheduledThreadPool.schedule(new Task(), 5, TimeUnit.SECONDS);

Thread.sleep(5000);

// 关闭线程池
scheduledThreadPool.shutdown();

}

}

为什么不建议用Executors创建线程池?

Executors创建的线程池,最大线程数等于核心线程数,并且请求队列无界,可能会造成OOM

image-20240310204839377

execute() vs submit() ⭐

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;

  • submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Futureget()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法的话,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException

这里只是为了演示使用,推荐使用 ThreadPoolExecutor 构造方法来创建线程池。

示例 1:使用 get()方法获取返回值。

ExecutorService executorService = Executors.newFixedThreadPool(3);

Future<String> submit = executorService.submit(() -> {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "abc";
});

String s = submit.get();
System.out.println(s);
executorService.shutdown();

shutdown() VS shutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN线程池不再接受新任务了,但是队列里的任务得执行完毕。

  • shutdownNow() :关闭线程池,线程池的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List

调用完 shutdownNowshuwdown 方法后,并不代表线程池已经完成关闭操作,它只是异步的通知线程池进行关闭处理。如果要同步等待线程池彻底关闭后才继续往下执行,需要调用awaitTermination方法进行同步等待。

在调用 awaitTermination() 方法时,应该设置合理的超时时间,以避免程序长时间阻塞而导致性能问题。另外。由于线程池中的任务可能会被取消或抛出异常,因此在使用 awaitTermination() 方法时还需要进行异常处理。awaitTermination() 方法会抛出 InterruptedException 异常,需要捕获并处理该异常,以避免程序崩溃或者无法正常退出

// ...
// 关闭线程池
executor.shutdown();
try {
// 等待线程池关闭,最多等待5分钟
if (!executor.awaitTermination(5, TimeUnit.MINUTES)) {
// 如果等待超时,则打印日志
System.err.println("线程池未能在5分钟内完全关闭");
}
} catch (InterruptedException e) {
// 异常处理
}

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。

  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

线程工厂

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

给线程池里的线程命名通常有下面两种方式:

1、利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

2、自己实现 ThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 线程工厂,它设置线程名称,有利于我们定位问题。
*/
public final class NamingThreadFactory implements ThreadFactory {

private final AtomicInteger threadNum = new AtomicInteger();
private final String name;

/**
* 创建一个带名字的线程池生产工厂
*/
public NamingThreadFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
return t;
}
}

ThreadLocal

概述

ThreadLocal是多线程中对于解决线程安全的一个操作类,它会为每个线程都分配一个独立的线程副本从而解决了变量并发访问冲突的问题。ThreadLocal 同时实现了线程内的资源共享

案例:使用JDBC操作数据库时,会将每一个线程的Connection放入各自的ThreadLocal中,从而保证每个线程都在各自的 Connection 上进行数据库的操作,避免A线程关闭了B线程的连接。

image-20230505224057228

ThreadLocal基本使用

三个主要方法:

  • set(value) 设置值

  • get() 获取值

  • remove() 清除值

ThreadLocal的实现原理&源码解析

Thread类源代码入手。

public class Thread implements Runnable {
//......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}

从上面Thread类 源代码可以看出Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap类对应的 get()set()方法。

ThreadLocal本质来说就是一个线程内部存储类,从而让多个线程只操作自己内部的值,从而实现线程数据隔离

image-20230505224341410

在ThreadLocal中有一个内部类叫做ThreadLocalMap,类似于HashMap

ThreadLocalMap中有一个属性table数组,这个是真正存储数据的位置

set方法

image-20230505224626253

get方法/remove方法

image-20230505224715087

通过上面这些内容,我们足以通过得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。ThrealLocal 类中可以通过Thread.currentThread()获取到当前线程对象后,直接通过getMap(Thread t)可以访问到该线程的ThreadLocalMap对象。

实际上key并不是ThreadLocal本身,而是它的一个弱引用

ThreadLocal-内存泄露问题

Java对象中的四种引用类型:强引用、软引用、弱引用、虚引用

  • 强引用:最为普通的引用方式,表示一个对象处于有用且必须的状态,如果一个对象具有强引用,则GC并不会回收它。即便堆中内存不足了,宁可出现OOM,也不会对其进行回收

image-20230505224755797

  • 弱引用:表示一个对象处于可能有用且非必须的状态。在GC线程扫描内存区域时,一旦发现弱引用,就会回收到弱引用相关联的对象。对于弱引用的回收,无关内存区域是否足够,一旦发现则会被回收

image-20230505224812015

每一个Thread维护一个ThreadLocalMap,在ThreadLocalMap中的Entry对象继承了WeakReference。其中key为使用弱引用的ThreadLocal实例,value为线程变量的副本

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。

这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录。使用完 ThreadLocal方法后最好手动调用remove()方法

image-20230505224857538

在使用ThreadLocal的时候,强烈建议:务必手动remove

实际上:

ThreadLocal的源码中设计了自动清理key为null的entry的代码逻辑。主要逻辑如下:

调用 set() 方法后,有以下几种情况:

情况1:通过 hash 计算后,Entry 数组对应的槽位为空,这种情况直接插入 key-value

**情况2:**发生了哈希冲突,且槽位数据不为空,但槽位中的 key 与要插入的 key 相同,这种情况直接覆盖元数据。

情况3:发生了哈希冲突,槽位数据不为空,槽位中的key与当前要插入的key不同,则进行线性探测,向后进行查找:

情况3.1:探测过程中,遇到空Entry,则直接插入

**情况3.2:**探测过程中,遇到key相同的Entry,则直接覆盖

情况3.3:探测过程中,遇到 key = null 的槽位,槽位下标为 i,则调用 replaceStaleEntry() 方法,slotToExpunge = staleSlot = i,然后从当前位置向前进行探测,遇到key = null的槽位,则更新slotToExpunge的值,直到Entry为null,然后以 staleSlot 为起始位置向后进行探测

**情况3.3.1:**如果找到key相同的entry则直接覆盖,然后交换 table[i]table[staleSlot] 的数据

**情况3.3.2:**如果找到空槽,则直接插入,然后与table[staleSlot]交换位置

插入后,开始进行过期元素的清理。

清理过期元素的方法有:探测式清理、启发式清理

探测式清理从slotToExpunge开始向后清除过期元素,同时将没有过期的元素进行重新哈希,更新元素的位置,如果发生了哈冲突,则向后进行线性探测