XhstormR


On a dark desert highway Cool wind in my hair


Java Concurrency Lock

Updated on 2017-08-05

https://docs.oracle.com/javase/10/docs/api/java/util/concurrent | 中文

Concept

  • 阻塞 -> Unsafe 类 -> park 操作

Code

LockSupport

  • 内部封装了 Unsafe 类的 park 操作。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class Main {
    public static void main(String[] args) {
        Thread a = new Thread(() -> {
            System.out.println("a 开始阻塞 5s");
            LockSupport.parkUntil(System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS));     绝对时间
            或
            LockSupport.parkNanos(TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS));     相对时间
            System.out.println("a 停止阻塞");
        });
        Thread b = new Thread(() -> {
            System.out.println("b 开始阻塞 无期限");
            LockSupport.park();
            System.out.println("b 停止阻塞");
        });
        Thread c = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                LockSupport.unpark(b);     线程停止阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        a.start();
        b.start();
        c.start();
    }
}
----
输出:
a 开始阻塞 5s
b 开始阻塞 无期限
b 停止阻塞
a 停止阻塞

ReentrantLock

  • 独占锁(排它锁)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 2; n++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 争夺锁");
                lock.lock();     在 try 块外加锁
                try {
                    System.out.println(Thread.currentThread().getName() + " 获得锁");
                    for (int i = 0; i < 2; i++) {
                        System.out.println(Thread.currentThread().getName() + " 运行中...");
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {     处理中断异常
                    e.printStackTrace();
                } finally {
                    lock.unlock();     在 finally 块中解锁
                    System.out.println(Thread.currentThread().getName() + " 释放锁");
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 争夺锁
pool-1-thread-2 争夺锁
pool-1-thread-1 获得锁
pool-1-thread-1 运行中...
pool-1-thread-1 运行中...
pool-1-thread-1 释放锁
pool-1-thread-2 获得锁
pool-1-thread-2 运行中...
pool-1-thread-2 运行中...
pool-1-thread-2 释放锁

ReentrantReadWriteLock

  • 读写锁
    • 读锁:共享锁,阻塞写锁   ,同时只能有 多个读
    • 写锁:独占锁,阻塞写锁和读锁,同时只能有 一个写
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Main {
    public static void main(String[] args) {
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 2; n++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 争夺写锁");
                lock.writeLock().lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " 获得写锁");
                    for (int i = 0; i < 2; i++) {
                        System.out.println(Thread.currentThread().getName() + " 运行中...");
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.writeLock().unlock();
                    System.out.println(Thread.currentThread().getName() + " 释放写锁");
                }
            });
        }
        for (int n = 0; n < 2; n++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " 争夺读锁");
                lock.readLock().lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " 获得读锁");
                    for (int i = 0; i < 2; i++) {
                        System.out.println(Thread.currentThread().getName() + " 运行中...");
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.readLock().unlock();
                    System.out.println(Thread.currentThread().getName() + " 释放读锁");
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 争夺写锁
pool-1-thread-2 争夺写锁
pool-1-thread-1 获得写锁
pool-1-thread-1 运行中...
pool-1-thread-3 争夺读锁
pool-1-thread-4 争夺读锁
pool-1-thread-1 运行中...
pool-1-thread-1 释放写锁
pool-1-thread-2 获得写锁
pool-1-thread-2 运行中...
pool-1-thread-2 运行中...
pool-1-thread-2 释放写锁
pool-1-thread-3 获得读锁
pool-1-thread-3 运行中...
pool-1-thread-4 获得读锁
pool-1-thread-4 运行中...
pool-1-thread-3 运行中...
pool-1-thread-4 运行中...
pool-1-thread-3 释放读锁
pool-1-thread-4 释放读锁

同步器

CountDownLatch

  • 一次性栅栏
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        int n = 3;
        CountDownLatch latch = new CountDownLatch(n);     初始阈值为 3

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 等待中...");
                    latch.await();     等待阈值减为 0
                    System.out.println(Thread.currentThread().getName() + " 运行中...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        for (int i = 0; i < n; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
                    latch.countDown();     阈值减 1
                    System.out.println(Thread.currentThread().getName() + " 运行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 等待中...
pool-1-thread-2 等待中...
pool-1-thread-3 等待中...
pool-1-thread-4 等待中...
pool-1-thread-5 等待中...
pool-1-thread-7 运行...
pool-1-thread-6 运行...
pool-1-thread-8 运行...
pool-1-thread-1 运行中...
pool-1-thread-5 运行中...
pool-1-thread-4 运行中...
pool-1-thread-2 运行中...
pool-1-thread-3 运行中...

CyclicBarrier

  • 循环同步栅栏
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        int n = 3;
        CyclicBarrier barrier = new CyclicBarrier(n, () -> System.out.println("阶段性完成"));     初始阈值为 3

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < n; i++) {
            executorService.execute(() -> {
                try {
                    for (int j = 1; j <= 4; j++) {
                        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + " 完成任务 " + j);
                        barrier.await();     阈值减 1,并等待阈值减为 0;同步后,重置阈值,并调用回调事件
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 完成任务 1
pool-1-thread-3 完成任务 1
pool-1-thread-2 完成任务 1
阶段性完成
pool-1-thread-1 完成任务 2
pool-1-thread-3 完成任务 2
pool-1-thread-2 完成任务 2
阶段性完成
pool-1-thread-3 完成任务 3
pool-1-thread-2 完成任务 3
pool-1-thread-1 完成任务 3
阶段性完成
pool-1-thread-3 完成任务 4
pool-1-thread-1 完成任务 4
pool-1-thread-2 完成任务 4
阶段性完成

Semaphore

  • 计数信号量
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);     信号量(最大并发量)为 3

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 5; n++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 等待信号...");
                    semaphore.acquire();     获得信号(应该在 try 块外获得信号,以防止不必要的释放信号)
                    System.out.println(Thread.currentThread().getName() + " 获得信号...");
                    for (int i = 0; i < 2; i++) {
                        System.out.println(Thread.currentThread().getName() + " 运行中...");
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();     释放信号
                    System.out.println(Thread.currentThread().getName() + " 释放信号...");
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 等待信号...
pool-1-thread-2 等待信号...
pool-1-thread-3 等待信号...
pool-1-thread-1 获得信号...
pool-1-thread-2 获得信号...
pool-1-thread-1 运行中...
pool-1-thread-3 获得信号...
pool-1-thread-3 运行中...
pool-1-thread-2 运行中...
pool-1-thread-4 等待信号...
pool-1-thread-5 等待信号...
pool-1-thread-1 运行中...
pool-1-thread-2 运行中...
pool-1-thread-3 运行中...
pool-1-thread-2 释放信号...
pool-1-thread-4 获得信号...
pool-1-thread-4 运行中...
pool-1-thread-1 释放信号...
pool-1-thread-3 释放信号...
pool-1-thread-5 获得信号...
pool-1-thread-5 运行中...
pool-1-thread-4 运行中...
pool-1-thread-5 运行中...
pool-1-thread-5 释放信号...
pool-1-thread-4 释放信号...

Exchanger

  • 用于在 成对 的线程之间 同步交换数据
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            try {
                String s = " A";
                System.out.println(Thread.currentThread().getName() + s);
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
                s = exchanger.exchange(s);     等待另一个线程调用 exchange 方法,同步交换数据
                System.out.println(Thread.currentThread().getName() + s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.execute(() -> {
            try {
                String s = " B";
                System.out.println(Thread.currentThread().getName() + s);
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
                s = exchanger.exchange(s);
                System.out.println(Thread.currentThread().getName() + s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-1 A
pool-1-thread-2 B
pool-1-thread-1 B
pool-1-thread-2 A

Phaser

  • 把多个线程 协同 执行的任务划分为 多个 阶段。
    • 每个 阶段 可以为其加入 任意 数量的线程。
    • 每个 线程 可以 随时 注册并参与某个阶段。
模拟 CountDownLatch
import java.util.concurrent.*;

public class Main {
    private static final int n = 3;
    private static final Phaser phaser = new Phaser(n);     线程的初始注册数为 3

    public static void main(String[] args) throws InterruptedException {
        System.out.println(show());     //3 = 0 + 3

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    phaser.register();     注册线程(注册数加 1)
                    System.out.println(show() + Thread.currentThread().getName() + " 等待中...");
                    TimeUnit.SECONDS.sleep(1);
                    phaser.arriveAndAwaitAdvance();     线程到达并等待所有线程到达(注册数 == 到达数);同步后,重置为未到达状态,进入下一个阶段
                    System.out.println(show() + Thread.currentThread().getName() + " 运行中...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        for (int i = 0; i < n; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(3, 10));
                    phaser.arriveAndDeregister();     线程到达并进行注销(注册数减 1)
                    System.out.println(show() + Thread.currentThread().getName() + " 运行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        TimeUnit.SECONDS.sleep(2);
        System.out.println(show());     //8 = 5 + 3

        executorService.shutdown();
    }

    private static String show() {
        注册数 = 到达数 + 未到达数
        return String.format("%d = %d + %d ",
                phaser.getRegisteredParties(),    当前  注册数
                phaser.get   ArrivedParties(),    当前  到达数
                phaser.get UnarrivedParties());   当前未到达数
              //phaser.get            Phase()     当前阶段
    }
}
----
输出:
3 = 0 + 3
4 = 0 + 4 pool-1-thread-1 等待中...
5 = 0 + 5 pool-1-thread-2 等待中...
6 = 0 + 6 pool-1-thread-3 等待中...
7 = 0 + 7 pool-1-thread-4 等待中...
8 = 0 + 8 pool-1-thread-5 等待中...
8 = 5 + 3
7 = 5 + 2 pool-1-thread-8 运行...
6 = 5 + 1 pool-1-thread-6 运行...
5 = 0 + 5 pool-1-thread-7 运行...
5 = 0 + 5 pool-1-thread-4 运行中...
5 = 0 + 5 pool-1-thread-5 运行中...
5 = 0 + 5 pool-1-thread-2 运行中...
5 = 0 + 5 pool-1-thread-3 运行中...
5 = 0 + 5 pool-1-thread-1 运行中...
模拟 CyclicBarrier
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        int n = 3;     线程的初始注册数
        int m = 4;     阶段的数量
        Phaser phaser = new Phaser(n) {
            @Override                 //(当前阶段,当前注册数)
            protected boolean onAdvance(int phase, int registeredParties) {     重写回调事件
                System.out.println("阶段性完成");
                return phase >= (m - 1) || registeredParties == 0;     是否进入终止状态(true 终止,false 继续)
            }
        };

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < n; i++) {
            executorService.execute(() -> {
                try {
                    while (!phaser.isTerminated()) {     是否处于终止状态
                        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
                        System.out.println(Thread.currentThread().getName() + " 完成任务 " + (phaser.getPhase() + 1));     获得当前阶段
                        phaser.arriveAndAwaitAdvance();     线程到达并等待所有线程到达(注册数 == 到达数);同步后,重置为未到达状态,进入下一个阶段
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}
----
输出:
pool-1-thread-3 完成任务 1
pool-1-thread-2 完成任务 1
pool-1-thread-1 完成任务 1
阶段性完成
pool-1-thread-3 完成任务 2
pool-1-thread-1 完成任务 2
pool-1-thread-2 完成任务 2
阶段性完成
pool-1-thread-3 完成任务 3
pool-1-thread-2 完成任务 3
pool-1-thread-1 完成任务 3
阶段性完成
pool-1-thread-3 完成任务 4
pool-1-thread-2 完成任务 4
pool-1-thread-1 完成任务 4
阶段性完成
TOP