死锁
死锁
发生在并发中,发生在两个线程或多个线程之间。
互不相让:当两个(或更多)线程(或进程)相互持有对方所需要的资源,又不主动释放,导致所有人都无法继续前进,导致程序陷入无尽的阻塞(等待状态),这就是死锁。

多个线程造成死锁的情况
如果多个线程之间的依赖关系是环形,存在环路的锁的依赖关系,那么也可能会发生死锁。

死锁的影响
- 死锁的影响在不同系统中是不一样的,这取决于系统对死锁的处理能力。
* 数据库中:检测并放弃事务(事务之间发生死锁)
* JVM中:无法自动处理
几率不高但危害大
- 不一定发生,但是遵循“墨菲定律”。
- 一旦发生,多是高并发场景,影响用户多。
- 整个系统崩溃,子系统崩溃、性能降低。
- 压力测试无法找出所有潜在的死锁。(死锁和并发量只是一个正相关的关系,并不是必然的关系,所以无法通过增加并发量来模拟出死锁,也无法保证高并发的情况下可以 100% 的模拟出死锁。)
发生死锁的例子
最简单的情况
/**
* 描述: 必定发生死锁的情况
*/
public class MustDeadLock implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
MustDeadLock r1 = new MustDeadLock();
MustDeadLock r2 = new MustDeadLock();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("线程1成功拿到两把锁");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("线程2成功拿到两把锁");
}
}
}
}
}
flag = 1
flag = 0
实际生产中的例子:转账
- 需要两把锁
- 获取两把锁成功,且余额大于 0,则扣除转出人,增加收款人的余额,是原子操作。
- 顺序相反导致死锁
/**
* 描述: 转账时候遇到死锁,一旦打开注释,便会发生死锁
*/
public class TransferMoneyDemo implements Runnable {
int flag = 1;
static Account a = new Account(500);
static Account b = new Account(500);
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
TransferMoneyDemo r1 = new TransferMoneyDemo();
TransferMoneyDemo r2 = new TransferMoneyDemo();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a的余额" + a.balance);
System.out.println("b的余额" + b.balance);
}
@Override
public void run() {
if (flag == 1) {
transferMoney(a, b, 200);
}
if (flag == 0) {
transferMoney(b, a, 200);
}
}
public static void transferMoney(Account from, Account to, int amount) {
synchronized (from) {
try {
Thread.sleep(500);//发生死锁的
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (to) {//卡在这里
if (from.balance - amount < 0) {
System.out.println("余额不足,转账失败");
}
from.balance -= amount;
to.balance += amount;
System.out.println("成功转账" + amount + "元");
}
}
}
static class Account {
public Account(int balance) {
this.balance = balance;
}
int balance;
}
}
模拟多人随机转账
import java.util.Random;
/**
* 描述: 多人同时转账,依然很危险
*/
public class MultiTransferMoney {
//账户数
private static final int NUM_ACCOUNTS = 500;
//金额
private static final int NUM_MONEY = 1000;
//重复次数
private static final int NUM_ITERATIONS = 1000000;
//线程数
private static final int NUM_THREADS = 20;
//随机工具类
public static Random rnd = new Random();
//初始化Account数组
public static Account[] accounts = new Account[NUM_ACCOUNTS];
/**
* 转账方法
* @param from
* @param to
* @param amount
*/
public static void transferMoney(Account from, Account to, int amount) {
synchronized (from) {
/* try {
Thread.sleep(500);//发生死锁的
} catch (InterruptedException e) {
e.printStackTrace();
}*/
synchronized (to) {//卡在这里
if (from.balance - amount < 0) {
System.out.println("余额不足,转账失败");
}
from.balance -= amount;
to.balance += amount;
System.out.println("成功转账" + amount + "元");
}
}
}
//初始化账户
static {
for (int i = 0; i < accounts.length; i++) {
accounts[i] = new Account(NUM_MONEY);
}
}
//Account类
static class Account {
public Account(int balance) {
this.balance = balance;
}
int balance;
}
//线程类
static class TransferThread extends Thread {
@Override
public void run() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
//生成随机账户、 转账金额
int fromAcct = rnd.nextInt(NUM_ACCOUNTS);
int toAcct = rnd.nextInt(NUM_ACCOUNTS);
int amount = rnd.nextInt(NUM_MONEY);
//调用转账方法
transferMoney(accounts[fromAcct], accounts[toAcct], amount);
}
System.out.println("运行结束");
}
}
public static void main(String[] args) {
//开启20个线程
for (int i = 0; i < NUM_THREADS; i++) {
new TransferThread().start();
}
}
}
死锁的 4 个必要条件
- 互斥条件:一把锁同一时间只能被一个线程占用。
- 请求与保持条件:阻塞在请求第二把锁的同时还不愿意释放自己持有的锁。
- 不剥夺条件:无权终止任意一个发生死锁的线程。
- 循环等待条件:a 等 b、b 等 c、 c 等 a。形成环路。
案例分析:分别把四个必要条件标注出来
/**
* 描述: 必定发生死锁的情况
*/
public class MustDeadLock implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
MustDeadLock r1 = new MustDeadLock();
MustDeadLock r2 = new MustDeadLock();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);//互斥条件
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o1) {//请求与保持条件
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) { //循环等待条件
System.out.println("线程1成功拿到两把锁");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("线程2成功拿到两把锁");
}
}
}
}
}
如何定位死锁
jps (JVM Process Status Tool)是其中的典型 jvm 工具。除了名字像 UNIX 的 ps 命令之外,它的功能也和 ps 命令类似:可以列出正在运行的虚拟机进程,并显示虚拟机执行主类(Main Class, main()函数所在的类)名称以及这些进程的本地虚拟机唯- ID (Local Virtual Machine Identifier, LVMID),虽然功能比较单一,但它是使用频率最高的 JDK 命令行工具
-l #展示类全限定名,如果进程执行的是 Jar 包则输出 Jar 路径。
-v #输出虚拟机进程启动时 JVM 参数
C:\Users\Ryzen-7-3800X>jps -l
25364 org.jetbrains.jps.cmdline.Launcher
25652 com.xdclass.couponapp.test.deadlock.MustDeadLock
33912 sun.tools.jps.Jps
5912 org.jetbrains.kotlin.daemon.KotlinCompileDaemon
18396 sun.tools.jconsole.JConsole
9276
Jstack (Stack Trace for Java)命令用于生成虚拟机当前时刻的线程快照(-般称为 threaddump 或者 javacore 文件)。
线程快照就是当前虚拟机内每一条线程正在执行的方法堆栈的集合,生成线程快照的主要目的是定位线程出现长时间停顿的原因,如线程间死锁、死循环、请求外部资源导致的长时间等待等都是导致线程长时间停顿的常见原因。线程出现停顿的时候通过 jstack 来查看各个线程的调用堆栈,就可以知道没有响应的线程到底在后台做些什么事情,或者等待着什么资源。
C:\Users\soulboy>jstack -l 14820
Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x0000000026159938 (object 0x0000000715d6e230, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x0000000026156f48 (object 0x0000000715d6e240, a java.lang.Object),
which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
at com.xdclass.couponapp.test.deadlock.MustDeadLock.run(MustDeadLock.java:47)
- waiting to lock <0x0000000715d6e230> (a java.lang.Object)
- locked <0x0000000715d6e240> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
"Thread-0":
at com.xdclass.couponapp.test.deadlock.MustDeadLock.run(MustDeadLock.java:35)
- waiting to lock <0x0000000715d6e240> (a java.lang.Object)
- locked <0x0000000715d6e230> (a java.lang.Object)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
ThreadMXBean 代码演示:用于让程序发生死锁之后的处理能力
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
/**
* 描述: 用ThreadMXBean检测死锁
*/
public class ThreadMXBeanDetection implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) throws InterruptedException {
ThreadMXBeanDetection r1 = new ThreadMXBeanDetection();
ThreadMXBeanDetection r2 = new ThreadMXBeanDetection();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
Thread.sleep(1000);
//解决死锁
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
//数组中的元素为发生死锁的线程
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0) {
for (int i = 0; i < deadlockedThreads.length; i++) {
ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
System.out.println("发现死锁" + threadInfo.getThreadName());
//报警
//日志记录
//重启程序
}
}
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("线程1成功拿到两把锁");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("线程2成功拿到两把锁");
}
}
}
}
}
修复死锁的策略
线上发生死锁的情况
- 线上问题都要防患于未然,不造成损失地扑灭几乎已经是不可能了。
- 保存案发现场然后立刻重启服务器
- 暂时保证线上服务的安全,然后在利用刚才保存的信息,排查死锁,修改代码,重新发布。
常见修复策略
- 避免策略:哲学家就餐的换手方案、转账换序案。
思路:避免相反的获取锁的顺序发生,实际开发中可以利用主键进行获取锁的排序,就不需要hashcode了。
- 检测与恢复策略:一段时间检测是否有死锁,如果有就剥夺某一个资源,来打开死锁。
- 鸵鸟策略:诺鸟这种动物在遇到危险的时候,通常就会把头埋在地上,这样一来它就看不到危险了。鸵鸟策略的意思就是说,如果发生死锁的概率极其低,那么可以直接忽略它,直到死锁发生的时候,再人工修复。几个月死锁一次的情况下才会考虑。
案例:避免相反的获取锁的顺序发生(避免策略)
package com.xdclass.couponapp.test.deadlock;
/**
* 描述: 转账时候遇到死锁,一旦打开注释,便会发生死锁
*/
public class TransferMoney implements Runnable {
int flag = 1;
static Account a = new Account(500);
static Account b = new Account(500);
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
TransferMoney r1 = new TransferMoney();
TransferMoney r2 = new TransferMoney();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("a的余额" + a.balance);
System.out.println("b的余额" + b.balance);
}
@Override
public void run() {
if (flag == 1) {
transferMoney(a, b, 200);
}
if (flag == 0) {
transferMoney(b, a, 200);
}
}
public static void transferMoney(Account from, Account to, int amount) {
//抽取转账的业务逻辑代码
class Helper {
public void transfer() {
if (from.balance - amount < 0) {
System.out.println("余额不足,转账失败。");
return;
}
from.balance -= amount;
to.balance = to.balance + amount;
System.out.println("成功转账" + amount + "元");
}
}
//利用哈希值:统一获取锁的顺序
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
//根据对象的hashcode作为获取所的顺序条件
//保证两个相互获取锁的线程,获取锁的顺序始终一致。
if (fromHash < toHash) {
synchronized (from) {
synchronized (to) {
new Helper().transfer();
}
}
}
else if (fromHash > toHash) {
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}else {
synchronized (lock) {// 发生hashcode碰撞,随便选择一种获取锁的顺序即可
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}
}
}
static class Account {
public Account(int balance) {
this.balance = balance;
}
int balance;
}
}
示例:哲学家就餐
# 多种解决方案
* 服务员检查(避免策略)
* 改变一个哲学家拿叉子的顺序(避免策略)
* 餐票(避免策略)
* 领导调节(检测与恢复策略)
改变一个哲学家拿叉子的顺序(避免策略)
/**
* 描述: 演示哲学家就餐问题导致的死锁
*/
public class DiningPhilosophers {
public static class Philosopher implements Runnable {
private Object leftChopstick;
private Object rightChopstick;
public Philosopher(Object leftChopstick, Object rightChopstick) {
this.leftChopstick = leftChopstick;
this.rightChopstick = rightChopstick;
}
@Override
public void run() {
try {
while (true) {
doAction("Thinking");
synchronized (leftChopstick) {
doAction("Picked up left chopstick");
synchronized (rightChopstick) {
doAction("Picked up right chopstick - eating");
doAction("Put down right chopstick");
}
doAction("Put down left chopstick");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAction(String action) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " " + action);
Thread.sleep((long) (Math.random() * 10));
}
}
public static void main(String[] args) {
Philosopher[] philosophers = new Philosopher[5];
Object[] chopsticks = new Object[philosophers.length];
for (int i = 0; i < chopsticks.length; i++) {
chopsticks[i] = new Object();
}
for (int i = 0; i < philosophers.length; i++) {
Object leftChopstick = chopsticks[i];
Object rightChopstick = chopsticks[(i + 1) % chopsticks.length];
//最后一位哲学家改变拿筷子的顺序(避免策略)
if (i == philosophers.length - 1) {
philosophers[i] = new Philosopher(rightChopstick, leftChopstick);
} else {
philosophers[i] = new Philosopher(leftChopstick, rightChopstick);
}
new Thread(philosophers[i], "哲学家" + (i + 1) + "号").start();
}
}
}
领导调节(检测与恢复策略)
* 允许发生死锁
* 每次调用锁都记录
* 定期检查 “锁的调用链路图” 中是否存在环路
* 一旦发生死锁,就用死锁恢复机制进行恢复
1.进程终止:逐个终止线程,知道死锁消除
2.终止顺序: 优先级(对于程序重要性的优先级)、已占用资源、还需要的资源作为终止进程的判断依据。或是已运行时间。
3.资源抢占:不把已经发放出去的锁给收回来。 缺点:造成饥饿
实际工程中如何避免死锁
- 获取锁的时候设置超时时间
* Lock 的 tryLock(long timeout,TimeUnit unit)
* synchronized 不具备尝试锁的能力
* 造成超时的可能性多:发生了死锁、线程陷入死循环、线程执行很慢等……
* 获取锁失败:打日志、发邮件、重启等
示例:用 tryLock 来避免死锁
package com.xdclass.couponapp.test.deadlock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 描述: 用tryLock来避免死锁
*/
public class TryLockDeadlock implements Runnable {
int flag = 1;
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
TryLockDeadlock r1 = new TryLockDeadlock();
TryLockDeadlock r2 = new TryLockDeadlock();
r1.flag = 1;
r2.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
if (flag == 1) {
try {
if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
System.out.println("线程1获取到了锁1");
Thread.sleep(new Random().nextInt(1000));
if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
System.out.println("线程1获取到了锁2");
System.out.println("线程1成功获取到了两把锁");
lock2.unlock();
lock1.unlock();
break;
} else {
System.out.println("线程1尝试获取锁2失败,已重试");
lock1.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程1获取锁1失败,已重试");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (flag == 0) {
try {
if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
System.out.println("线程2获取到了锁2");
Thread.sleep(new Random().nextInt(1000));
if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
System.out.println("线程2获取到了锁1");
System.out.println("线程2成功获取到了两把锁");
lock1.unlock();
lock2.unlock();
break;
} else {
System.out.println("线程2尝试获取锁1失败,已重试");
lock2.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("线程2获取锁2失败,已重试");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
- 多使用并发类而不是自己设计的锁。
* ConcurrentHashMap、ConcurrentLinkedQueue、AtomicBoolean等…
* 实际应用中 java.util.concurrent.atomic 十分有用,简单方便且效率比使用 Lock 更高。
* 多使用并发集合少使用同步集合,并发集合比同步集合的可扩展性更好。
* 并发场景需要用到map,首先想到用ConcurrentHashMap
- 尽量减低锁的使用粒度:用不同的锁而不是一个锁。
- 如果能使用同步代码块,就不使用同步方法:自己指定锁对象。
- 给你的线程起个有意义的名字:debug和排查时事半功倍,框架和JDK都遵循这个最佳实践。
- 避免锁的嵌套:MustDeadLock类
- 分配资源前先看能不能收回来:银行家算法。
- 尽量避免几个功能用同一把锁:专锁专用
其他活性故障
死锁是最常见的活跃性问题,除了死锁之外,还有一些类似的问题,会导致程序无法顺利执行,统称为活跃性问题。
- 活锁(LiveLock)
- 饥饿
活锁成因
虽然线程并没有阻塞,也始终在运行(所以叫做"活"锁,线程是"活"的),但是程序却得不到进展,因为线程始终重复做同样的事。
死锁会陷入阻塞(不消耗CPU时间片),而活锁会消耗CPU资源(不断的运行)。
# 哲学家问题
同时拿起左边的筷子,同时等待一段时间,同时放下手中的筷子,再等 5 分钟,再同时拿起筷子……repeat。
如果这里死锁,那么就是这里两个人都始终一动不动,直到对方先抬头,他们之间不再说话了,只是等待。

活锁示例:吃饭
成因:重试机制不变,消息队列始终重试,吃饭始终谦让。
解决方案:以太网的指数退避算法。
工程中活锁实例:消息队列
成因:由于依赖服务出了问题,处理该消息一直失败,一直重试(试图处理该消息)。消息如果处理失败,就放在队列开头重试
解决方案:放在队尾、重试限制(阈值触发,持久化到数据库中,从队列中踢出该消息)
import java.util.Random;
/**
* 描述: 演示活锁问题
*/
public class LiveLock {
static class Spoon {
private Diner owner;
public Spoon(Diner owner) {
this.owner = owner;
}
public Diner getOwner() {
return owner;
}
public void setOwner(Diner owner) {
this.owner = owner;
}
public synchronized void use() {
System.out.printf("%s吃完了!", owner.name);
}
}
static class Diner {
private String name;
private boolean isHungry;
public Diner(String name) {
this.name = name;
isHungry = true;
}
public void eatWith(Spoon spoon, Diner spouse) {
while (isHungry) {
if (spoon.owner != this) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
Random random = new Random();
// if (spouse.isHungry) //活锁
if (spouse.isHungry && random.nextInt(10) < 9) { //大部分情况都会小于9 (0~9)
System.out.println(name + ": 亲爱的" + spouse.name + "你先吃吧");
spoon.setOwner(spouse);
continue;
}
spoon.use();
isHungry = false;
System.out.println(name + ": 我吃完了");
spoon.setOwner(spouse);
}
}
}
public static void main(String[] args) {
Diner husband = new Diner("牛郎");
Diner wife = new Diner("织女");
Spoon spoon = new Spoon(husband);
new Thread(new Runnable() {
@Override
public void run() {
husband.eatWith(spoon, wife);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
wife.eatWith(spoon, husband);
}
}).start();
}
}
饥饿成因
当线程需要某些资源(CPU),但是却始终得不到。
线程的优先级设置得过于低,或者有某线程持有锁同时又无限循环从而不释放锁,或者某些程序始终占用某文件的写锁。
饥饿可能会导致响应性差:比如,我们的浏览器有一个线程负责处理前台响应(打开收藏夹等动作),另外的后台线程负责下载图片、文件、计算渲染等。在这种情况下,如果后台线程把CPU资源都占用了,那么前台线程将无法得到很好地执行,这会导致用户的体验很差。