【多线程系列】01.线程间的通信

By | 5月 24, 2023

1. 线程间通信是什么?

多个线程同时运行时,不同线程之间信息的传递交互,这就是所谓的通信。

2. 为什么要做线程间通信?

在某些场景下需要线程间的协作,而某些场景需要线程间信息的交互。

  • 例如,微服务下通过traceId全链路跟踪日志,但是涉及很多线程。
  • 例如,实现多个线程有序运行
  • 例如,当多个线程操作同个对象累加,如何确保累加的准确性

3.如何实现线程间通信?

  • 通过 volatile 关键字
  • 通过 Object类的 wait/notify 方法
  • 通过 condition 的 await/signal 方法
  • 通过 join 的方式
  • 通过TreadLocal传递信息

4. 示例

(1) 通过 volatile 关键字

volatile主要针对于内存可见性和有序性,不能保证原子性。当写一个volatile变量时,JMM会将该线程对应的工作内存中的共享变量立即刷新到主存;当读一个volatile变量时,JMM会把该线程对应的工作内存中的值置为无效,然后从主存中进行读取,但是如果没有线程对该共享变量进行修改,则不会触发该操作。

public class ThreadA {

    private static volatile int num = 0;
    public static void main(String[] args) throws InterruptedException {
        for(int i=0; i<100; i++){
            new Thread(()->{
                System.out.println("当前线程:"+Thread.currentThread().getId()+" num="+(++num));
            }).start();
        }
        Thread.sleep(1000L);
        System.out.println("最后输出:num="+num);
    }
}

输出结果如下图,可以看到线程乱序,但是每次累加都是正常的,最后结果是100,但是如果去掉 volatile 关键词会发现输出不是100,这是因多线程累加是先查出来数据,再累加,再写回,可能出现并发查询到相同的值,累加结果一样,依次写回内存。

(2) 通过 Object类的 wait/notify 方法

public class ThreadWaitNotify {
    public static void main(String[] args) {
        //new一个锁对象
        Object lock = new Object();
        new Thread(()->{
            for(int i=0; i<5; i++){
                synchronized (lock){
                    System.out.println(Thread.currentThread().getName()+" i="+i);
                    if(i%2==0){
                        System.out.println(Thread.currentThread().getName()+" Notify: "+i);
                        lock.notify(); //回调触发后 后续流程走完
                        try {
                            lock.wait(); //wait后 直接释放锁
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        },"Thread_A").start();
        new Thread(()->{
            for(int i=0; i<5; i++){
                synchronized (lock){
                    System.out.println(Thread.currentThread().getName()+" i="+i);
                    if(i%2==1){
                        System.out.println(Thread.currentThread().getName()+" Notify: "+i);
                        lock.notify();
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        },"Thread_B").start();
    }
}

输出结果如下图, Thread_A和 Thread_B两个线程竞争锁对象lock, Thread_A 先拿到锁,循环一次i=0[偶数时]后回调并wait释放锁; Thread_B拿到锁,循环两次输出i=0,1 [奇数时] 回调并wait释放锁 ; 以此类推。

(3) 通过 condition 的 await/signal 方法

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadAwaitSignal {

    public static void main(String[] args) {
        //new一个锁对象
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        AtomicInteger cntA = new AtomicInteger(0);
        AtomicInteger cntB = new AtomicInteger(0);
        new Thread(()->{
            while(cntA.get()<3){
                lock.lock(); //加锁
                try{
                    cntA.getAndIncrement();
                    System.out.println(Thread.currentThread().getName()+" cntA="+cntA+" 拿到锁");
                    if(cntA.get()%2==1){
                        System.out.println(Thread.currentThread().getName()+" cntA="+cntA+" 释放锁");
                        condition.await(); //为奇数时释放锁,进入阻塞状态等待

                    }
                    condition.signal(); //接收Thread_B释放锁的信号
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        },"Thread_A").start();
        new Thread(()->{
            while(cntB.get()<3){
                lock.lock();
                try{
                    cntB.getAndIncrement();
                    System.out.println(Thread.currentThread().getName()+" cntB="+cntB+" 拿到锁");
                    if(cntB.get()%2==0){
                        System.out.println(Thread.currentThread().getName()+" cntB="+cntB+" 释放锁");
                        condition.await();

                    }
                    condition.signal(); //接收Thread_A释放锁的信号
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        },"Thread_B").start();
    }
}

输出结果如下图, Thread_A和 Thread_B两个线程竞争锁对象lock, Thread_A 先拿到锁,累加一次i=1[奇数时]await()阻塞释放锁; Thread_B拿到锁,循环两次输出i=0,1 [偶数时] await()阻塞释放锁 ;以此类推。

(4) 通过 join 的方式

public class ThreadJoin {
    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(()->{
            System.out.println("当前执行线程:"+Thread.currentThread().getName());

        },"Thread_A");
        Thread threadB = new Thread(()->{
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前执行线程:"+Thread.currentThread().getName());
        },"Thread_B");
        Thread threadC = new Thread(()->{
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前执行线程:"+Thread.currentThread().getName());

        },"Thread_C");

        threadA.start();
        threadA.join();
        threadB.start();
        threadB.join();
        threadC.start();
        threadC.join();
        System.out.println("主线程");
    }
}

输出结果如下图。 其中,主线程执行main方法,在没使用join()前,可以看到先输出了“主线程“,其余线程后续处理完;当使用join()后,几个子线程加入到主线程执行,主线程阻塞,待所有线程处理完后再继续执行主线程任务,输出”主线程”。

注:join方法的本质调用的是Object中的wait方法实现线程的阻塞

未使用join()前
使用join()后

(5) 通过TreadLocal传递信息

这里使用InheritableThreadLocal 解决父子线程数据共享,可以看到利用ThreadLocal对象使多个线程之间进行了值的传递。

注:ThreadLocal 是一个线程的本地对象。当某个变量在使用 ThreadLocal 进行维护时,ThreadLocal 为使用该变量的每个线程分配了一个独立的变量副本,每个线程可以自行操作自己对应的变量副本,而不会影响其他线程的变量副本。

public class ThreadLocalTest {
    public static void main(String[] args) {
        System.out.println("当前执行线程:"+Thread.currentThread().getName());
        //new ThreadLocal对象
        ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
        threadLocal.set("onezero");
        //启动线程A
        Thread threadA = new Thread(()->{
            System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());

        },"Thread_A");
        threadA.start();
        //启动线程B
        threadLocal.set("onezero");
        Thread threadB = new Thread(()->{
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());
        },"Thread_B");
        threadB.start();
        //启动线程C
        threadLocal.set("Thread_C");
        Thread threadC = new Thread(()->{
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前执行线程:"+Thread.currentThread().getName()+" threadLocal: "+threadLocal.get());
        },"Thread_C");
        threadC.start();
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("当前执行线程:"+Thread.currentThread().getName());
    }
}

输出结果如下图 :

补充:ThreadLocal+MDC结合可以实现链路日志ID传递,如下图所示,SpringCloud的Sleuth组件通过实现Runnable接口,进入线程时,将Trace信息存储在ThreadLocal变量中,出线程时,从ThreadLocal变量中取出Trace信息,作为参数传递到下一个线程。

发表评论

您的电子邮箱地址不会被公开。