1. 线程间通信是什么?
多个线程同时运行时,不同线程之间信息的传递交互,这就是所谓的通信。
2. 为什么要做线程间通信?
在某些场景下需要线程间的协作,而某些场景需要线程间信息的交互。
- 例如,微服务下通过traceId全链路跟踪日志,但是涉及很多线程。
- 例如,实现多个线程有序运行
- 例如,当多个线程操作同个对象累加,如何确保累加的准确性
3.如何实现线程间通信?
- 通过 volatile 关键字
- 通过 Object类的 wait/notify 方法
- 通过 condition 的 await/signal 方法
- 通过 join 的方式
- 通过TreadLocal传递信息
4. 示例
(1) 通过 volatile 关键字
volatile主要针对于内存可见性和有序性,不能保证原子性。当写一个volatile变量时,JMM会将该线程对应的工作内存中的共享变量立即刷新到主存;当读一个volatile变量时,JMM会把该线程对应的工作内存中的值置为无效,然后从主存中进行读取,但是如果没有线程对该共享变量进行修改,则不会触发该操作。
public class ThreadA {
private static volatile int num = 0;
public static void main(String[] args) throws InterruptedException {
for(int i=0; i<100; i++){
new Thread(()->{
System.out.println("当前线程:"+Thread.currentThread().getId()+" num="+(++num));
}).start();
}
Thread.sleep(1000L);
System.out.println("最后输出:num="+num);
}
}
输出结果如下图,可以看到线程乱序,但是每次累加都是正常的,最后结果是100,但是如果去掉 volatile 关键词会发现输出不是100,这是因多线程累加是先查出来数据,再累加,再写回,可能出现并发查询到相同的值,累加结果一样,依次写回内存。
(2) 通过 Object类的 wait/notify 方法
public class ThreadWaitNotify {
public static void main(String[] args) {
//new一个锁对象
Object lock = new Object();
new Thread(()->{
for(int i=0; i<5; i++){
synchronized (lock){
System.out.println(Thread.currentThread().getName()+" i="+i);
if(i%2==0){
System.out.println(Thread.currentThread().getName()+" Notify: "+i);
lock.notify(); //回调触发后 后续流程走完
try {
lock.wait(); //wait后 直接释放锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
},"Thread_A").start();
new Thread(()->{
for(int i=0; i<5; i++){
synchronized (lock){
System.out.println(Thread.currentThread().getName()+" i="+i);
if(i%2==1){
System.out.println(Thread.currentThread().getName()+" Notify: "+i);
lock.notify();
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
},"Thread_B").start();
}
}
输出结果如下图, Thread_A和 Thread_B两个线程竞争锁对象lock, Thread_A 先拿到锁,循环一次i=0[偶数时]后回调并wait释放锁; Thread_B拿到锁,循环两次输出i=0,1 [奇数时] 回调并wait释放锁 ; 以此类推。
(3) 通过 condition 的 await/signal 方法
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadAwaitSignal {
public static void main(String[] args) {
//new一个锁对象
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
AtomicInteger cntA = new AtomicInteger(0);
AtomicInteger cntB = new AtomicInteger(0);
new Thread(()->{
while(cntA.get()<3){
lock.lock(); //加锁
try{
cntA.getAndIncrement();
System.out.println(Thread.currentThread().getName()+" cntA="+cntA+" 拿到锁");
if(cntA.get()%2==1){
System.out.println(Thread.currentThread().getName()+" cntA="+cntA+" 释放锁");
condition.await(); //为奇数时释放锁,进入阻塞状态等待
}
condition.signal(); //接收Thread_B释放锁的信号
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
},"Thread_A").start();
new Thread(()->{
while(cntB.get()<3){
lock.lock();
try{
cntB.getAndIncrement();
System.out.println(Thread.currentThread().getName()+" cntB="+cntB+" 拿到锁");
if(cntB.get()%2==0){
System.out.println(Thread.currentThread().getName()+" cntB="+cntB+" 释放锁");
condition.await();
}
condition.signal(); //接收Thread_A释放锁的信号
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
},"Thread_B").start();
}
}
输出结果如下图, Thread_A和 Thread_B两个线程竞争锁对象lock, Thread_A 先拿到锁,累加一次i=1[奇数时]await()阻塞释放锁; Thread_B拿到锁,循环两次输出i=0,1 [偶数时] await()阻塞释放锁 ;以此类推。
(4) 通过 join 的方式
public class ThreadJoin {
public static void main(String[] args) throws InterruptedException {
Thread threadA = new Thread(()->{
System.out.println("当前执行线程:"+Thread.currentThread().getName());
},"Thread_A");
Thread threadB = new Thread(()->{
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前执行线程:"+Thread.currentThread().getName());
},"Thread_B");
Thread threadC = new Thread(()->{
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前执行线程:"+Thread.currentThread().getName());
},"Thread_C");
threadA.start();
threadA.join();
threadB.start();
threadB.join();
threadC.start();
threadC.join();
System.out.println("主线程");
}
}
输出结果如下图。 其中,主线程执行main方法,在没使用join()前,可以看到先输出了“主线程“,其余线程后续处理完;当使用join()后,几个子线程加入到主线程执行,主线程阻塞,待所有线程处理完后再继续执行主线程任务,输出”主线程”。
注:join方法的本质调用的是Object中的wait方法实现线程的阻塞
(5) 通过TreadLocal传递信息
这里使用InheritableThreadLocal 解决父子线程数据共享,可以看到利用ThreadLocal对象使多个线程之间进行了值的传递。
注:ThreadLocal 是一个线程的本地对象。当某个变量在使用 ThreadLocal 进行维护时,ThreadLocal 为使用该变量的每个线程分配了一个独立的变量副本,每个线程可以自行操作自己对应的变量副本,而不会影响其他线程的变量副本。
public class ThreadLocalTest {
public static void main(String[] args) {
System.out.println("当前执行线程:"+Thread.currentThread().getName());
//new ThreadLocal对象
ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
threadLocal.set("onezero");
//启动线程A
Thread threadA = new Thread(()->{
System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());
},"Thread_A");
threadA.start();
//启动线程B
threadLocal.set("onezero");
Thread threadB = new Thread(()->{
try {
Thread.sleep(10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());
},"Thread_B");
threadB.start();
//启动线程C
threadLocal.set("Thread_C");
Thread threadC = new Thread(()->{
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());
},"Thread_C");
threadC.start();
try {
Thread.sleep(50L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前执行线程:"+Thread.currentThread().getName());
}
}
输出结果如下图 :
补充:ThreadLocal+MDC结合可以实现链路日志ID传递,如下图所示,SpringCloud的Sleuth组件通过实现Runnable接口,进入线程时,将Trace信息存储在ThreadLocal变量中,出线程时,从ThreadLocal变量中取出Trace信息,作为参数传递到下一个线程。