java线程相关内容

15

1.什么是线程和进程

进程:正在运行的程序,操作系统支持多进程,PID:进程ID

线程

  • 进程中一条执行路径,也是CPU的基本调度单位
  • 进程由多个线程组成,彼此间完成不同的工作,交替执行,称为多线程

如迅雷是一个进程,下载多个任务是多个线程

java虚拟机会自动创建一个主线程,主线程执行main方法

区别

  • 进程是操作系统分配资源的基本单位,而线程是CPU的基本调度单位
  • 一个程序运行后至少一个进程
  • 一个进程可以包含多个线程,但是至少需要一个线程
  • 进程间不能共享数据段地址,但同进程的线程之间可以

2.线程的组成

  • CPU时间片:操作系统会为每个线程分配执行时间
  • 运行数据
    • 堆空间:存储线程需使用的对象,多个线程可以共享堆中的对象。
    • 存储线程需使用的局部变量,每个线程都拥有独立的栈。
    • 线程的栈与main方法的栈是各自独立的。
  • 线程的逻辑代码

执行特点

  • 抢占式执行
    • 效率高
    • 可防止单一线程长时间独占CPU
  • 在单核CPU中,宏观上同时执行,微观上顺序执行
  • 在多核CPU中,每个核都可以独立执行一个线程,真正的并发执行

3.创建多线程的方式

3.1 继承Thread类,重写run()方法

启动线程,使用start方法,不能直接调用start方法

//1.继承Thread,重写run方法
public class MyThread extends Thread{
    //4.设置线程名通过构造方法
    public MyThread(String name){
        //调用父类的构造方法
        super(name);
    }
    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            //2.获取线程ID和名称
            //两种方式
            //方式一:this.getId() 获取线程ID this.getName() 获取线程名字
            System.out.println(this.getId()+"。。。"+this.getName()+"。。。"+i);
            //方式二:使用Thread.currentThread() 获取当前线程(推荐,灵活性高,不一定要继承Thread类)
            System.out.println(Thread.currentThread().getId()+"。。。"+Thread.currentThread().getName()+"。。。"+i);
        }
    }
}

class TestMyThread{
    //java虚拟机会自动创建一个主线程,主线程执行main方法
    public static void main(String[] args) throws InterruptedException {
        //创建子线程对象
        MyThread thread = new MyThread("t1");
        //3.设置线程的名称 -> 4进行简化
        //thread.setName("t1");
        MyThread thread2 = new MyThread("t2");
        //thread2.setName("t2");
        //启动线程,使用start方法,不能直接调用start方法
        thread.start();
        thread2.start();
        //在mian主线程中
        for (int i = 0; i < 100; i++) {
            System.out.println("主线程"+i);
        }
    }
}

案例:买卖票

//实现卖票功能
public class TicketWin extends Thread {
    public TicketWin(String name) {
        super(name);
    }
    private int ticket  = 100;

    @Override
    public void run() {
        while (true) {
            if (ticket <= 0) {
                break;
            }
            System.out.println(Thread.currentThread().getName()+"买了第"+ticket+"张");
            ticket--;
        }
    }
}

//测试类
class TestTicket{
    public static void main(String[] args) {
        TicketWin t1 = new TicketWin("t1");
        TicketWin t2 = new TicketWin("t2");
        TicketWin t3 = new TicketWin("t3");
        TicketWin t4 = new TicketWin("t4");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

内存图:

[wppay]

3.2 获取线程的ID和名字

获取线程名称:

  • 方式一:this.getId() 获取线程ID this.getName() 获取线程名字
  • 方式二:使用Thread.currentThread() 获取当前线程(推荐,灵活性高,不一定要继承Thread类)
    • Thread.currentThread().getName()

设置线程名称:在创建线程的时候设置

MyThread thread = new MyThread();
thread.setName("t1");

设置线程名简化版(构造方法):

//4.设置线程名通过线程对象的构造方法
public MyThread(String name){
//调用父类的构造方法
super(name);
}
//在调用的地方
MyThread thread = new MyThread("t1");

3.3.实现Runnable接口,实现run()方法

简单使用:

//可运行类
public class MyRunnable implements Runnable{
    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            System.out.println(Thread.currentThread().getName()+" 子线程"+i);
        }
    }
}
class TestRunnable{
    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        //可以直接设置线程名
        new Thread(myRunnable,"t1").start();
        new Thread(myRunnable,"t2").start();
    }
}

案例:售票窗口类

//主要是共享同一资源
    //ticket设置成静态,整个内存只有一份:再添加线程卖100张票就不行
    //ticket不是静态的,再new一个Ticket就可以
    //更符合面向对象的特征

//共享的资源类 可运行对象
public class Ticket implements Runnable {
    private int ticket = 100;
    //卖票
    @Override
    public void run() {
        while (true) {
            if (ticket <= 0) {
                break;
            }
            System.out.println(Thread.currentThread().getName() + "买了第" + ticket + "张");
            ticket--;
        }
    }
}

//测试类
class TestTicket {
    public static void main(String[] args) {
        //创建票对象:四个窗口用的这一个票
        Ticket runnable = new Ticket();
        //创建线程
        Thread thread = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        Thread thread3 = new Thread(runnable);
        Thread thread4 = new Thread(runnable);
        //启动线程
        thread.start();
        thread2.start();
        thread3.start();
        thread4.start();
    }
}

内存图:

为什么共享资源不能设置成静态:

ticket设置成静态,整个内存只有一份:再添加线程卖100张票就不行,ticket设置成不是静态的,再new一个Ticket就可以,更符合面向对象的特征

对内存这样的源码分析:

private Runnable target;

Thread thread = new Thread(runnable);

    public Thread(Runnable target) {
        init(null, target, "Thread-" + nextThreadNum(), 0);
    }
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize) {
        init(g, target, name, stackSize, null, true);
    }
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
      //省略源码
        this.target = target;
        setPriority(priority);
     //省略源码
    }
    public void run() {
        if (target != null) {
            target.run();
        }
    }

可以看到因为传入的Runnable对象相同,所以调用的run方法也相同。都是对一个ticket进行操作。

3.4 两个方法的使用场景

  • 继承Thread类创建线程
    • 没有资源共享使用
  • 实现Runnable接口
    • 有资源共享
    • 线程操作相同,共享资源类实现Runnable接口
    • 线程操作不同,使用操作类来分开

案例:存取取钱

public class BankCard {
    //余额
    private double money;

    public double getMoney() {
        return money;
    }

    public void setMoney(double money) {
        this.money = money;
    }
}

//存钱
class AddMoney implements Runnable {
    private BankCard bank1;

    public AddMoney(BankCard bank1) {
        this.bank1 = bank1;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            bank1.setMoney(bank1.getMoney() + 1000);
            System.out.println(Thread.currentThread().getName() + " 余额是:" + bank1.getMoney());
        }
    }
}

//取钱
class SubMoney implements Runnable {
    private BankCard bank1;

    public SubMoney(BankCard bank1) {
        this.bank1 = bank1;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (bank1.getMoney() >= 1000) {
                bank1.setMoney(bank1.getMoney() - 1000);
                System.out.println(Thread.currentThread().getName() + " 余额是:" + bank1.getMoney());
            } else {
                System.out.println("没钱了!");
                //保证钱能取完
                i--;
            }
        }
    }
}

class TestBankCard {
    public static void main(String[] args) {
        BankCard bankCard = new BankCard();
        AddMoney addMoney = new AddMoney(bankCard);
        new Thread(addMoney,"t1").start();

        SubMoney subMoney = new SubMoney(bankCard);
        new Thread(subMoney,"t2").start();
    }
}

存取钱和买卖票案例优化

采用匿名内部类的方式。

买卖票

public class TestTicket {
    public static void main(String[] args) {
        Runnable runnable = new Runnable(){
            private int ticket = 100;
            @Override
            public void run() {
                while (true){
                    if (ticket<=0){
                        break;
                    }
                    System.out.println(Thread.currentThread().getName());
                    ticket--;
                }
            }
        };
        new Thread(runnable,"t1").start();
        new Thread(runnable,"t2").start();
        new Thread(runnable,"t3").start();
        new Thread(runnable,"t4").start();
    }
}

存取钱

public class TestBankCard {
    public static void main(String[] args) {
        BankCard card = new BankCard();
        Runnable add = new Runnable(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    card.setMoney(card.getMoney() + 1000);
                    System.out.println(Thread.currentThread().getName() + " 余额是:" + card.getMoney());
                }
            }
        };
        Runnable sub = new Runnable(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    if (card.getMoney() >= 1000) {
                        card.setMoney(card.getMoney() - 1000);
                        System.out.println(Thread.currentThread().getName() + " 余额是:" + card.getMoney());
                    } else {
                        System.out.println("没钱了!");
                        //保证钱能取完
                        i--;
                    }
                }
            }
        };
        new Thread(add,"t1").start();
        new Thread(sub,"t2").start();
    }
}

4.线程的状态

4.1 四个基本+三个特殊状态

  • 初始状态:线程对象被创建,即为初始状态,只在堆中开辟内存,与常规对象无异。
  • 就绪状态:调用start()之后,进入就绪状态,等待OS选中,并分配时间片
  • 运行状态:获得时间片之后,进入运行状态,如果时间片到期,则回到就绪状态
  • 终止状态:主线程main()或独立线程run()结束进入终止状态,并释放持有的时间片
  • 等待状态:
    • 在运行状态 执行sleep(x) 进入限期等待 Timed Waiting
    • 在运行状态 执行join() 进入无限期等待 Waiting
  • 阻塞状态:没有获得锁,加了同步操作synchronized

就绪状态与运行状态:JDK5之后就绪、运行统称Runnable

4.2 常见方法

  • sleep()
    • Thread.sleep();当前线程主动休眠 毫秒。释放CPU,不再争抢CPU
  • yield()
    • Thread.yield();当前线程主动放弃时间片,回到就绪状态,竞争下一次时间片
  • join()
    • Thread.join();先start()再join()允许其他线程加入到当前线程中,当前线程会阻塞,直到加入线程执行完毕。
  • 设置优先级proprity()
    • 线程对象.setPriority(int)
    • 线程优先级1-10,默认为5,优先级越高,表示获取CPU机会越多
  • 打断interrupt()
    • 线程对象.interrupt():打断线程,被打断的线程抛出InterruptedException异常
  • 守护线程
    • 如:垃圾回收器线程
    • thread.setDaemon(true); thread.start();
    • 线程的分类
      • 1.用户线程(前台线程)
      • 2.守护线程(后台线程)
    • 如果程序中所有前台线程都执行完毕了,后台线程会自动结束。
  • 获取线程当前状态
    • 线程对象.getState()

测试代码:

sleep()与yield()

public class TestSleep {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName()+" "+i);
                    //1.休眠 父类没有向上抛出异常,只能try_catch
                    /*try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }*/
                    //2.yield() 主动释放时间片,只让给优先级等于高于当前的,下次继续争抢CPU
                    Thread.yield();
                }
            }
        };
        new Thread(runnable,"t1").start();
        new Thread(runnable,"t2").start();
    }
}

join()

public class JoinThread_2 {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {

            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName()+" "+i);
                }
            }
        };
        Thread thread = new Thread(runnable, "子线程");
        thread.start();
        //thread加入主线程,主线程阻塞,直到加入线程执行完毕
        thread.join();
        for (int i = 0; i < 20; i++) {
            System.out.println("主线程 "+ i);
        }
    }
}

interrupt()

public class InterruptTest {
    public static void main(String[] args) throws IOException {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("子程序开始休眠");
                try {
                    Thread.sleep(2000);
                    System.out.println("自然醒了");
                } catch (InterruptedException e) {
                    System.out.println("被打醒了");
                }
                System.out.println("子程序结束");
            }
        };
        System.out.println("主程序开始");
        Thread thread = new Thread(runnable, "子线程");
        thread.start();
        System.out.println("10秒内输入任意字符可以打断子线程");
        //读取一个字节
        System.in.read();
        thread.interrupt();
        System.out.println("主线程结束");
        //输出的顺序为什么和看到的不同:与休眠时间相关
    }
}

守护线程

public class Daemon {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 20; i++) {
                    System.out.println(Thread.currentThread().getName() + " " + i);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread thread = new Thread(runnable, "子线程");
        //设置为守护线程
        thread.setDaemon(true);
        thread.start();
        for (int i = 0; i < 10; i++) {
            System.out.println("主线程" + " " + i);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Priority()

public class Priority {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {

            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName()+" "+i);
                }
            }
        };
        Thread thread = new Thread(runnable, "子线程");
        thread.start();
        thread.setPriority(1);
        Thread thread2 = new Thread(runnable, "子线程");
        thread.start();
        Thread thread3 = new Thread(runnable, "子线程");
        thread.start();
        thread3.setPriority(10);
    }
}

4.3 线程安全问题

复原问题:

public class ThreadArrayDemo {
    static int size = 0;

    public static void main(String[] args) throws InterruptedException {
        String[] strings = new String[5];
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                strings[size++] = "hello";
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                strings[size++] = "world";
            }
        });

        t1.start();
        t2.start();
        //阻塞主线程
        t1.join();
        //阻塞主线程
        t2.join();
        System.out.println(Arrays.toString(strings));
    }
}

线程不安全
当多线程并发访问临界资源时,如果破坏原子操作,可能会造成数据不一致。
临界资源:共享资源(同一对象),一次仅允许一个线程使用,才可保证其正确性
原子操作:不可分割的多步操作,被视作一个整体,其顺序和步骤不可打乱或缺省

4.4 线程同步

线程同步伴随着等待,同一时间只有一个线程能进入

没有等待,你执行你的,我执行我的,叫异步。

同步代码块

synchronized(锁){//对临界资源对象加锁
	//代码 原子操作
}
  • 一般使用临界资源或唯一引用类型对象作为锁。
  • 每个对象都有一个互斥锁标记,用来分配给线程的,只有拥有对象互斥锁标记的线程,才能进入对该对象枷锁的同步代码块。
  • 线程退出同步代码块时,会释放相应的互斥锁标记。

同步代码块优化存储字符数组和买卖票

字符数组的存储

public class ThreadArrayDemo2 {
    static int size = 0;

    public static void main(String[] args) throws InterruptedException {
        String[] strings = new String[5];
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (strings){
                    strings[size++] = "hello";
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (strings){
                    strings[size++] = "world";
                }
            }
        });

        t1.start();
        t2.start();
        //阻塞主线程
        t1.join();
        //阻塞主线程
        t2.join();
        System.out.println(Arrays.toString(strings));
    }
}

买卖票问题

public class Ticket {
    public static void main(String[] args) {
        Runnable r = new Runnable() {
            private int ticket = 100;
            @Override
            public void run() {
                while (true) {
                    //锁只要是个对象就可以
                    //不能直接在里面创建锁,每次不一样
                    synchronized (this){
                        if (ticket <= 0) {
                            break;
                        }
                        System.out.println(Thread.currentThread().getName() + " 卖的票" + ticket);
                        ticket--;
                    }
                }
            }
        };
        new Thread(r, "t1").start();
        new Thread(r, "t2").start();
        new Thread(r, "t3").start();
        new Thread(r, "t4").start();
    }
}

存钱取钱问题

public class BankCard {
    private double money;

    public BankCard() {
    }

    public double getMoney() {
        return money;
    }

    public void setMoney(double money) {
        this.money = money;
    }
}

class Test {
    public static void main(String[] args) {
        BankCard card = new BankCard();
        //存钱
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (card) {
                    for (int i = 0; i < 10; i++) {
                        card.setMoney(card.getMoney() + 1000);
                        System.out.println(Thread.currentThread().getName() + " " + card.getMoney());
                    }
                }
            }
        }, "t1").start();
        //取钱
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (card) {
                    for (int i = 0; i < 10; i++) {
                        if (card.getMoney() >= 1000) {
                            card.setMoney(card.getMoney() - 1000);
                            System.out.println(Thread.currentThread().getName() + " " + card.getMoney());
                        } else {
                            System.out.println("余额不足");
                            i--;
                        }
                    }
                }
            }
        }, "t2").start();
    }
}

同步方法

public synchronized 返回值类型 方法名称(形参列表) //对当前对象(this)加锁
{ 
               //代码(原子操作)
}

只有拥有对象互斥锁标记的线程,才能进入该对象加锁的同步方法中。
线程退出同步方法时,会释放相应的互斥锁标记。
如果是静态方法,锁是【类名.class】

实例方法加锁

public class Ticket {
    public static void main(String[] args) {
        Runnable a = new Runnable() {
            private int ticket = 100;

            @Override
            public void run() {
                while (true) {
                    if (!sellTicket()) {
                        break;
                    }
                }
            }

            //买票方法(同步方法) 默认锁是this
            public synchronized boolean sellTicket() {
                if (ticket <= 0) {
                    return false;
                }
                System.out.println(
                    Thread.currentThread().getName() + " 第" + ticket + "章"
                );
                ticket--;
                return true;
            }
        };
        new Thread(a, "t1").start();
        new Thread(a, "t2").start();
        new Thread(a, "t3").start();
        new Thread(a, "t4").start();
    }
}

静态方法加锁

public class Ticket_2 {
    public static void main(String[] args) {
        Ticket2 a = new Ticket2();
        new Thread(a, "t1").start();
        new Thread(a, "t2").start();
        new Thread(a, "t3").start();
        new Thread(a, "t4").start();
    }
}


class Ticket2 implements Runnable {
//要求共享资源也是静态的
    private static int ticket = 100;
    @Override
    public void run() {
        while (true) {
            if (!sellTicket()) {
                break;
            }
        }
    }

    //买票方法 静态方法时,要求共享资源也是静态的,默认锁是类.class
    public static synchronized boolean sellTicket() {
        if (ticket <= 0) {
            return false;
        }
        System.out.println(
            Thread.currentThread().getName() + " 第" + ticket + "章"
        );
        ticket--;
        return true;
    }
}

面试题:不能使用synchronized 关键字修饰run()方法

  • 这里加锁的是每个线程对象本身,其实并没有并发控制。
  • synchronized关键字有有两种用法,一种在方法上使用,一种是在方法内部使用。无论哪种用法都需要对一个对象上锁。对非静态方法的修饰则是对对象自身上锁,对静态方法的修饰则是对class上锁。所以要看你究竟是对谁上锁

5.死锁

当第一个线程拥有A对象锁标记,并等待B对象锁标记,同时第二个线程拥有B对象锁标记,并等待A对象锁标记时,产生死锁。

一个线程可以同时拥有多个对象的锁标记,当线程阻塞时,不会释放已经拥有的锁标记,由此可能造成死锁。

public class MyLock {
    public static Object locka = new Object();
    public static Object lockb = new Object();
}

class Boy extends Thread {
    @Override
    public void run() {
        synchronized (MyLock.locka) {
            System.out.println(Thread.currentThread().getName() + "  拿到A锁");
            synchronized ((MyLock.lockb)) {
                System.out.println(Thread.currentThread().getName() + "  拿到B锁");
                System.out.println("可以吃了!");
            }
        }
    }
}

class Girl extends Thread {
    @Override
    public void run() {
        synchronized (MyLock.lockb) {
            System.out.println(Thread.currentThread().getName() + "  拿到A锁");
            synchronized ((MyLock.locka)) {
                System.out.println(Thread.currentThread().getName() + "  拿到B锁");
                System.out.println("可以吃了!");
            }
        }
    }
}

class TestLock {
    public static void main(String[] args) {
        while (true) {
            new Thread(new Boy(), "Boy").start();
            new Thread(new Girl(), "Girl").start();
        }
    }
}

6.线程通信

6.1 三个方法

等待:

  • public final void wait()
  • public final void wait(long timeout)
  • 必须在对obj加锁的同步代码块中,在一个线程中,调用obj.wait()时,此线程会释放其拥有的所有锁标记。同时此线程 阻塞在锁的等待队列中。释放锁,进入等待队列

通知:

  • public final void notify():从等待队列中随机唤醒一个
  • public final void notifyAll():唤醒所有等待线程
  • 必须在对obj加锁的同步代码块中。从obj的Waiting中释放一个或全部线程。对自身没有任何影响

需求:存钱取钱的过程控制谁先执行 , 要求实现存一次取一次,有顺序的同步

让谁抢到CPU我,我们是控制不到,但是可以控制代码的执行顺序

案例可能存在的问题(处理方式看代码):

  • 多存多取的情况 if->while
  • 全部进入等待队列 notify->notifyAll
public class BankCard {
    private double money;
    //false 没钱可以执行存钱
    //true 有钱不能在存可以取钱
    private boolean flag = false;
    //两个线程通信的前提必须有同步

    //存钱 锁是this
    public synchronized void save(double m) {
        //控制代码执行的时机
        while (flag) {
            //有钱则等待  锁.wait() 会进入等待队列
            //释放CPU和锁
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.money += m;
        System.out.println(Thread.currentThread().getName() + " 存了:" + m);
        //修改标记,表示存成功,可以取了。
        flag = true;
        this.notifyAll();
    }

    //取钱 锁是this
    public synchronized void take(double m) {
        //控制代码执行的时机
        while (!flag) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.money -= m;
        System.out.println(Thread.currentThread().getName() + " 取了:" + m);
        flag = false;
        this.notifyAll();
    }
}

class AddMoney implements Runnable {
    private BankCard card;

    public AddMoney(BankCard card) {
        this.card = card;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            card.save(1000);
        }
    }
}

class SubMoney implements Runnable {
    private BankCard card;

    public SubMoney(BankCard card) {
        this.card = card;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            card.take(1000);
        }
    }
}

class Test {
    public static void main(String[] args) {
        BankCard bankCard = new BankCard();
        AddMoney addMoney = new AddMoney(bankCard);
        SubMoney subMoney = new SubMoney(bankCard);
        new Thread(addMoney, "存钱").start();
        new Thread(subMoney, "取钱").start();

        //TODO 1.存在的问题:再加线程时出现问题,如何解决?
        //多存多取的情况:把判断的if换成while
        new Thread(addMoney, "存钱2").start();
        new Thread(subMoney, "取钱2").start();
        //TODO 2.又出现新的问题,全部进入等待队列里面出现死锁
        //唤醒线程的问题:全部唤醒 notify()换成notifyAll()

    }
}

6.2 生产者消费者问题

自己写的:

public class Cleck {
    private int bread = 0;

    //生产
    public synchronized void produce() {
        if (bread >= 20) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        bread++;
        System.out.println(Thread.currentThread().getName() + " " + bread);
        this.notify();
    }

    //消费
    public synchronized void comsume() {
        if (bread <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        bread--;
        System.out.println(Thread.currentThread().getName() + " " + bread);
        this.notify();
    }
}

class Consumer2 implements Runnable {
    private Cleck cleck;

    public Consumer2(Cleck cleck) {
        this.cleck = cleck;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            cleck.comsume();
        }
    }
}

class Producer implements Runnable {
    private Cleck cleck;

    public Producer(Cleck cleck) {
        this.cleck = cleck;
    }

    @Override
    public void run() {
        for (int i = 0; i < 20; i++) {
            cleck.produce();
        }
    }
}

class Test {
    public static void main(String[] args) {
        Cleck con = new Cleck();
        Producer produce = new Producer(con);
        Consumer2 consume = new Consumer2(con);
        new Thread(produce, "produce").start();
        new Thread(consume, "consume").start();
    }
}

其他的:

class Bread {
    private int id;
    private String production;

    public Bread() {
    }

    public Bread(int id, String production) {
        this.id = id;
        this.production = production;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getProduction() {
        return production;
    }

    public void setProduction(String production) {
        this.production = production;
    }
}

class BreadCon {
    private Bread[] breads = new Bread[6];
    private int size;

    //存放面包
    public synchronized void put(Bread b) {
        //判断
        if (size >= 6) {
            try {
                this.wait();//等待,释放cpu和锁
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        breads[size] = b;
        size++;
        System.out.println(Thread.currentThread().getName() + "生产了" + b.getId() + "号面包");
        //唤醒
        this.notify();
    }

    //消费面包
    public synchronized void take() {
        if (size <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        size--;
        Bread b = breads[size];
        breads[size] = null;
        System.out.println(Thread.currentThread().getName() + "消费了" + b.getId() + "号面包");
        this.notify();
    }
}

class Consume implements Runnable {
    private BreadCon con;

    public Consume(BreadCon con) {
        this.con = con;
    }

    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            con.take();
        }
    }
}

class Produce implements Runnable {
    private BreadCon con;

    public Produce(BreadCon con) {
        this.con = con;
    }

    @Override
    public void run() {
        for (int i = 0; i < 30; i++) {
            con.put(new Bread(i, Thread.currentThread().getName()));
        }
    }
}

public class TestProduceConsume {
    public static void main(String[] args) {
        //创建容器
        BreadCon con = new BreadCon();
        //生产
        Produce produce = new Produce(con);
        //消费
        Consume consume = new Consume(con);
        //创建进程
        new Thread(produce, "produce").start();
        new Thread(consume, "consume").start();
    }
}

面试题:sleep()与wait()方法的异同

sleep是休眠,休眠时释放CPU,不会释放资源(锁),自动唤醒
wait()是等待,wait()等待时释放cpu和资源,一般需要别的线程唤醒
方法的声明位置不同:Thread类sleep(),Object类wait()
调用:sleep()可以在任何需要的场景下调用 wait()必须是同步代码块或同步方法

7.多线程高级部分

7.1 线程池

现有问题:

  • 线程是宝贵的内存资源,单个线程约占1MB,过多分配易造成内存溢出
  • 频繁的创建及销毁线程会增加虚拟机回收频率、资源开销、造成程序性能下降

线程池:

  • 线程容器,可设定线程分配的数量上限
  • 将预先创建的线程对象存入池中,并重用线程池中的线程对象
  • 避免频繁的创建和销毁

线程池原理

  • 将任务提交给线程池,由线程池分配线程、运行任务,并在当前任务结束后复用线程

7.2 创建线程池的四种方法

常用的线程池接口和类(所在包java.util.concurrent)

  • Executor:线程池的顶级接口。
  • ExecutorService:线程池接口, 可通过submit(Runnable task)提交任务代码。
    • void shutdown()
      • 线程全部执行完毕后关闭线程池
    • boolean isTerminated()
      • 判断线程池中所有线程是否都执行完毕
    • <T>Future<T> submit(Runnable task);
      • 提交任务

Executors工厂类:通过此类可以获得一个线程池。

创建线程池的四种方法:

  • newFixedThreadPool(int nThreads)
    • 获取固定线程数量的线程池
  • newCachedThreadPool()
    • 获取动态线程数量的线程池,不够就创建新的
  • newSingleThreadExecutor()
    • 获取单线程池
  • newScheduledThreadPool(int corePoolSize)(特殊,创建线程池与提交任务均与其他线程池不同)
    • 调度线性池,实现周期执行或延迟执行
    • 周期执行
      • 固定频率:es.scheduleAtFixedRate(command,initialDelay,period,unit);//提交任务
      • 固定延迟:es.scheduleWithFixedDelay(command,initialDelay,period,unit);//提交任务
    • 延迟执行:es.schedule(command,delay,unit);//提交任务

固定大小的线程池

public class TestThreadPool {
    public static void main(String[] args) {
        //创建线程池,四种线程池
        //(1) 创建固定大小的线程池  4个线程
        ExecutorService es = Executors.newFixedThreadPool(4);
        //(2)创建任务
        Runnable runnable = new Runnable() {
            private int ticket = 100;

            @Override
            public void run() {
                while (true) {
                    synchronized (this) {
                        if (ticket <= 0) {
                            break;
                        }
                        System.out.println(Thread.currentThread().getName() + " " + ticket);
                        ticket--;
                    }
                }
            }
        };
        //(3)提交四个任务
        for (int i = 0; i < 4; i++) {
            es.submit(runnable);
        }
        //(4)关闭线程池:全部线程结束才关闭
        es.shutdown();
    }
}

动态大小的线程池

public class TestThreadPool_2 {
    public static void main(String[] args) {
        //(1) 创建动态大小的线程池  个数不确定由提交的任务确定
        ExecutorService es = Executors.newCachedThreadPool();
        //(2)创建任务
        Runnable runnable = new Runnable() {
            private int ticket = 1000;

            @Override
            public void run() {
                while (true) {
                    synchronized (this) {
                        if (ticket <= 0) {
                            break;
                        }
                        System.out.println(Thread.currentThread().getName() + " " + ticket);
                        ticket--;
                    }
                }
            }
        };
        //(3)提交四个任务
        for (int i = 0; i < 4; i++) {
            es.submit(runnable);
        }
        //(4)关闭线程池:全部线程结束才关闭
        es.shutdown();
    }
}

单线程池

public class TestThreadPool_3 {
    public static void main(String[] args) {
        //(1) 创建单线程池  只有一个线程
        ExecutorService es = Executors.newSingleThreadExecutor();
        //(2)创建任务
        Runnable runnable = new Runnable() {
            private int ticket = 100;

            @Override
            public void run() {
                while (true) {
                    synchronized (this) {
                        if (ticket <= 0) {
                            break;
                        }
                        System.out.println(Thread.currentThread().getName() + " " + ticket);
                        ticket--;
                    }
                }
            }
        };
        //(3)提交四个任务
        for (int i = 0; i < 4; i++) {
            es.submit(runnable);
        }
        //(4)关闭线程池:全部线程结束才关闭
        es.shutdown();
    }
}

调度线程池

public class TestThreadPool_4 {
    static int count = 0;
    public static void main(String[] args) {
        //创建调度线程池  实现周期执行或者延迟执行
        //(1)周期执行
        ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
        //固定频率:30天->一天一个程序  11-15天没写 16天补上  30
        /*es.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                //控制什么时候关闭
                count++;
                if (count == 100) {
                    es.shutdown();
                }
                //固定频率演示
                if (count == 11) {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println(new Date().toLocaleString());
            }
            //一秒钟执行一次
        }, 0, 1, TimeUnit.SECONDS);*/
        //固定延迟:30天->一天一个程序  11-15天没写 16天不补延迟日子  35
        /*es.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                //控制什么时候关闭
                count++;
                if (count == 100) {
                    es.shutdown();
                }
                //固定频率演示
                if (count == 11) {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println(new Date().toLocaleString());
            }
            //一秒钟执行一次
        }, 0, 1, TimeUnit.SECONDS);*/

        //(2)延迟执行
        es.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date().toLocaleString());
            }
        }, 1000, TimeUnit.SECONDS);
    }
}

7.3 ThreadPoolExecutor类的参数

  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数
  • keepAliveTime:非核心线程的存活时间
  • unit:时间单位
  • workQueue:工作队列
  • threadFactory:线程工厂
  • handler:拒绝策略
    • AbortPolicy:中断, 抛出异常(核心业务, 使用最多)
    • DiscardPolicy:直接抛弃, 不抛出异常(非核心业务)
    • DiscardOldestPolicy:把旧的抛弃, 加入新的(喜新厌旧)
    • CallerRunsPolicy:线程池创建者执行。

使用顺序:核心线程->工作队列->最大线程

public class TestThreadPool_5 {
    static int count = 0;

    public static void main(String[] args) {
        //ThreadPoolExecutor的七个参数
        //核心线程数、最大线程数、存活时间(非核心线程数的)、时间单位、
        //工作队列、线程工厂、线程的拒绝策略(4个)(重要)
    /*public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)*/
        LinkedBlockingQueue queue = new LinkedBlockingQueue(1);
        //此时最多4个线程,最大三个,队列一个
        ThreadPoolExecutor tp = new ThreadPoolExecutor(
            2,
            2,
            60L,
            TimeUnit.SECONDS,
            queue,
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());

        /*handler 拒绝策略  ThreadPoolExecutor.xxx
        1.AbortPolicy:中断拒绝,抛出异常(核心业务,使用最多)
        2.DiscardPolicy 抛弃,不抛出异常(非核心业务)
        3.DiscardOldestPolicy:把旧的扔了,加入新的
        4.CallerRunsPolicy:由线程池创建者执行
        */

        //提交任务
        for (int i = 0; i < 4; i++) {
            int n = count++;
            tp.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "执行完毕" + n);
                }
            });
        }
        tp.shutdown();
        //new ThreadPoolExecutor.AbortPolicy()
        //先打印三个,在打印一个。 线程3个,队列一个。加入5个抛出异常
        //new ThreadPoolExecutor.DiscardPolicy()
        //只打印三个,加入5个线程,最后一个抛弃
        //new ThreadPoolExecutor.DiscardOldestPolicy()
        //加入count输出显示 注意count++的位置,不要写在线程里面,没用
        //new ThreadPoolExecutor.CallerRunsPolicy()
        //主线程调用超出的线程
    }
}

7.4 Callable接口创建线程

Callable接口概念

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

JDK5加入,与Runnable接口类似,实现之后代表一个线程任务。

Callable具有泛型返回值,可以声明异常。

面试题:Callable与Runnable的区别

Callable规定的方法是call(),而Runnable规定的方法是run()
Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。
call()方法可抛出异常,而run()方法是不能抛出异常的。
运行Callable任务可拿到一个Future对象, Future表示异步计算的结果。 它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。
Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。

Future接口

概念:异步接收ExecutorService.submit()所返回的状态结果,当中包含了call ()的返回值。

方法: V get()以阻塞形式等待Future中的异步处理结果(call()的返回值)。

简单创建实现

public class CallableDemo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建可调用对象
        Callable<Integer> callable = new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i < 10; i++) {
                    sum += i;
                    System.out.println(i);
                    Thread.sleep(1000);
                }
                return sum;
            }
        };
        //2.可调用对象转为将要执行的任务(代表线程执行的结果)
        FutureTask<Integer> task = new FutureTask<>(callable);
        //3.创建线程对象
        Thread thread = new Thread(task);
        //4.启动线程
        thread.start();
        //5.获取结果(阻塞方法,等待线程的执行结果), 结果输出
        System.out.println(task.get());
    }
}

Callable与线程池配合使用

可以把大任务分成小任务

Callable对象是可以直接传给线程池的(不需要转换成FutureTask)

线程池.submit (线程池的提交)有返回值,为future : 表示将要执行完任务的结果,就是call()的返回值

Future:表示将要完成任务的结果

方法:V get() 以阻塞形式等待Future中的异步处理结果(call()的返回值)

//把大任务分成小任务
public class CallableDemo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建线程池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //2.提交任务
        //计算0-9
        Future<Integer> submit1 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i < 10; i++) {
                    sum += i;
                    System.out.println(Thread.currentThread().getName() + " " + i);
                    Thread.sleep(1000);
                }
                return sum;
            }
        });
        //计算11-19
        Future<Integer> submit2 = pool.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 11; i < 20; i++) {
                    sum += i;
                    System.out.println(Thread.currentThread().getName() + " " + i);
                    Thread.sleep(1000);
                }
                return sum;
            }
        });
        //注意 pool.submit()是有返回值的
        //Future类型 表示线程将要完成任务的结果
        //从Future获取结果  Future对象.get()
        System.out.println(submit1.get() + submit2.get());
        pool.shutdown();
    }
}

7.5 Lock接口

JDK5加入,相当于synchronized,显示定义,结构更灵活

常用方法:

  • void lock()//获取锁,如锁被占用,则等待
  • boolean tryLock() //尝试获取锁(true/false,不阻塞)
  • void unlock() //释放锁

锁的分类

可重入的概念

public class ReentrantLock1 {
    public static void main(String[] args) {
        //锁
        Object object = new Object();
        //可重入
        //(OpenJDK可看到)ObjectMonitor(监视器,进入recursions+1)
        synchronized (object) {
            System.out.println("第一层");
            //再进入recursions再加1
            synchronized (object) {
                System.out.println("第二层");
            }
        }
    }
}

可重入锁的实例1

public class ReentrantLock2 {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(4);
        Runnable runnable = new Runnable() {
            private int ticket = 100;
            //1.创建Lock接口 : 可重入锁
            private Lock lock = new ReentrantLock();

            @Override
            public void run() {
                while (true) {
                    //2.上锁
                    lock.lock();
                    try {
                        if (ticket <= 0) {
                            break;
                        }
                        System.out.println(Thread.currentThread().getName() + " " + ticket);
                        ticket--;
                    } finally {
                        //3.解锁
                        //保证一定能解锁,防止出现异常时,无法解锁
                        lock.unlock();
                    }
                }
            }
        };
        es.submit(runnable);
        es.shutdown();
    }
}

可重入锁的实例2-存钱取钱

public class ReentrantLock3 {
    public static void main(String[] args) {
        BankCard bankCard = new BankCard();
        Lock lock = new ReentrantLock();
        Runnable add = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 20; i++) {
                    lock.lock();
                    try {
                        bankCard.setMoney(bankCard.getMoney() + 1000);
                        System.out.println(Thread.currentThread().getName() + " " + bankCard.getMoney());
                    } finally {
                        lock.unlock();
                    }
                }
            }
        };
        Runnable get = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 20; i++) {
                    if (bankCard.getMoney() >= 1000) {
                        bankCard.setMoney(bankCard.getMoney() - 1000);
                        System.out.println(Thread.currentThread().getName() + " " + bankCard.getMoney());
                    } else {
                        System.out.println("没钱了");
                        i--;
                    }
                }
            }
        };

        ExecutorService service = Executors.newFixedThreadPool(2);
        service.submit(add);
        service.submit(get);
        service.shutdown();

    }
}

读写锁的概念

ReentrantReadWriteLock:

  • 一种支持一写多读的同步锁,读写分离,可分别分配读锁、写锁。
  • 支持多次分配读锁,使多个读操作可以并发执行。

互斥规则

  • 写-写:互斥,阻塞
  • 读-写:互斥,读阻塞写,写阻塞读
  • 读-读:不互斥,不阻塞
  • 在读操作远远高于写操作的环境中,可在保障线程安全的情况下,提高运行效率。

ReentrantReadWriteLock案例

public class ReadWriterDemo {
    //创建读写锁
    private ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    //变量
    private int value;

    public int getValue() throws InterruptedException {
        //上锁
        readWriteLock.readLock().lock();
        try {
            Thread.sleep(1000);
            return value;
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
    public void setValue(int value) throws InterruptedException {
        //上锁
        readWriteLock.writeLock().lock();
        try {
            Thread.sleep(1000);
            this.value=value;
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }
}
public class TestReadWriteLock {
    public static void main(String[] args) {
        ReadWriterDemo readWriterDemo=new ReadWriterDemo();
        ExecutorService es = Executors.newFixedThreadPool(20);
        Runnable read=new Runnable() {
            @Override
            public void run() {
                try {
                   int r= readWriterDemo.getValue();
                    System.out.println(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable write=new Runnable() {
            @Override
            public void run() {
                try {
                    int i = new Random().nextInt(100);
                    readWriterDemo.setValue(i);
                    System.out.println(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //提交任务
        long start=System.currentTimeMillis();
        for(int i=0;i<2;i++){
            es.submit(write);
        }

        for(int i=0;i<18;i++){
            es.submit(read);
        }
        es.shutdown();
        //等待20个线程执行完毕,才继续执行
        while(!es.isTerminated()){}
        long end=System.currentTimeMillis();
        System.out.println("用时:"+(end-start));
    }
}

7.6 Condition接口(条件)

lock == synchronized : 同步

synchronized使用方法wait()等方法。

lock实现synchronizedd的功能使用Condition提供方法,条件队列重要。

Condition的概念

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。

Condition可以通俗的理解为条件队列,当一个线程在调用了await方法以后,直到线程等待的某个条件为真的时候才会被唤醒。

Condition中的三个方法

  • await() 当前线程进入等待状态
  • signal() 唤醒一个等待线程
  • signalAll() 唤醒所有等待线程

用Lock实现生产者和消费者案例进行线程通信

直接实现:

public class Bread {
    private int id;
    private String name;

    public Bread(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}

class BreadCon {
    private Bread[] breads = new Bread[6];
    private int size;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //生产
    public void pro(Bread b) {
        lock.lock();
        try {
            if (size >= 6) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            breads[size] = b;
            size++;
            System.out.println(Thread.currentThread().getName() + " 生产:" + b.getId());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    //消费
    public void con() {
        lock.lock();
        try {
            if (size <= 0) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            size--;
            System.out.println(Thread.currentThread().getName() + " 消费:" + breads[size].getId());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

class Test {
    public static void main(String[] args) {
        BreadCon con = new BreadCon();
        //生产
        Runnable pro = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    con.pro(new Bread(i, Thread.currentThread().getName()));
                }
            }
        };
        //消费
        Runnable com = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    con.con();
                }
            }
        };

        //创建线程池
        ExecutorService es = Executors.newFixedThreadPool(2);
        es.submit(pro);
        es.submit(com);
        es.shutdown();
    }
}

优化存在的两个问题

public class Bread_2 {
    private int id;
    private String name;

    public Bread_2(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }
}

class BreadCon2 {
    private Bread_2[] breads = new Bread_2[6];
    private int size;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    //生产
    public void pro(Bread_2 b) {
        lock.lock();
        try {
            if (size >= 6) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            breads[size] = b;
            size++;
            System.out.println(Thread.currentThread().getName() + " 生产:" + b.getId());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    //消费
    public void con() {
        lock.lock();
        try {
            if (size <= 0) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            size--;
            System.out.println(Thread.currentThread().getName() + " 消费:" + breads[size].getId());
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

class Test2 {
    public static void main(String[] args) {
        BreadCon2 con = new BreadCon2();
        //生产
        Runnable pro = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    con.pro(new Bread_2(i, Thread.currentThread().getName()));
                }
            }
        };
        //消费
        Runnable com = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    con.con();
                }
            }
        };

        //创建线程池
        ExecutorService es = Executors.newFixedThreadPool(2);
        es.submit(pro);
        es.submit(com);
        es.shutdown();
    }
}

再优化lock可以创建多个队列:生产与消费队列分离,syn只能创建一个队列

class Bread {
    private int id;
    private String productName;

    public Bread(int id, String productName) {
        this.id = id;
        this.productName = productName;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }
}
 class BreadCon {
    private Bread[] breads=new Bread[6];
    private int size;
    //创建锁
    private Lock lock=new ReentrantLock();
    //创建条件队列
    //生产线程队列
    Condition proCondition=lock.newCondition();
    //消费线程队列
    Condition conCondition=lock.newCondition();
    //存放
    public void put(Bread b){
        lock.lock();
        try {
            while(size>=6){
                try {
                    proCondition.await();//进入生产条件队列,释放cpu、锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            breads[size]=b;
            size++;
            System.out.println(Thread.currentThread().getName()+"生产了"+b.getId());
            //唤醒消费线程队列中的线程
            conCondition.signal();
        }finally {
            lock.unlock();
        }

    }
    //获取
    public void take(){
        lock.lock();
        try {
            while(size<=0){
                try {
                    conCondition.await();//释放了cpu和锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            size--;
            Bread b=breads[size];
            System.out.println(Thread.currentThread().getName()+"消费了"+b.getId()+"生产者:"+b.getProductName());
            proCondition.signal();
        }finally {
            lock.unlock();
        }

    }
}
public class TestProduceConsume {
    public static void main(String[] args) {
        BreadCon con=new BreadCon();
        Runnable produce=new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 30; i++) {
                    con.put(new Bread(i, Thread.currentThread().getName()));
                }
            }
        };
        Runnable consume=new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 30; i++) {
                    con.take();
                }
            }
        };
        ExecutorService es = Executors.newFixedThreadPool(4);
        es.submit(produce);
        es.submit(produce);
        es.submit(consume);
        es.submit(consume);
        es.shutdown();
    }
}

面试题:使用Lock和Condition实现三个线程交替输出20遍“ABC”

多个布尔值多个队列:

public class Test {
    private boolean fa = true, fb, fc;
    private Lock lock = new ReentrantLock();
    private Condition con = lock.newCondition();
    private Condition con2 = lock.newCondition();
    private Condition con3 = lock.newCondition();

    public void A() throws InterruptedException {
        lock.lock();
        try {
            if (!fa) {
                con.await();
            }
            System.out.println(Thread.currentThread().getName()+" A");
            fa = false;
            fb = true;
            con2.signal();
        } finally {
            lock.unlock();
        }
    }

    public void B() throws InterruptedException {
        lock.lock();
        try {
            if (!fb) {
                con2.await();
            }
            System.out.println(Thread.currentThread().getName()+" B");
            fb = false;
            fc = true;
            con3.signal();
        } finally {
            lock.unlock();
        }
    }

    public void C() throws InterruptedException {
        lock.lock();
        try {
            if (!fc) {
                con3.await();
            }
            System.out.println(Thread.currentThread().getName()+" C");
            fc = false;
            fa = true;
            con.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Test test = new Test();
        Runnable a = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.A();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable b = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.B();
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable c = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.C();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService pool = Executors.newFixedThreadPool(3);
        pool.submit(a);
        pool.submit(b);
        pool.submit(c);
        pool.shutdown();
    }
}

多个布尔值单个队列

public class Test_1 {
    private boolean fa = true, fb, fc;
    private Lock lock = new ReentrantLock();
    private Condition con = lock.newCondition();

    public void A() throws InterruptedException {
        lock.lock();
        try {
            while (!fa) {
                con.await();
            }
            System.out.println(Thread.currentThread().getName()+" A");
            fa = false;
            fb = true;
            con.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void B() throws InterruptedException {
        lock.lock();
        try {
            while (!fb) {
                con.await();
            }
            System.out.println(Thread.currentThread().getName()+" B");
            fb = false;
            fc = true;
            con.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void C() throws InterruptedException {
        lock.lock();
        try {
            while (!fc) {
                con.await();
            }
            System.out.println(Thread.currentThread().getName()+" C");
            fc = false;
            fa = true;
            con.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        Test_1 test = new Test_1();
        Runnable a = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.A();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable b = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.B();
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable c = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.C();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService pool = Executors.newFixedThreadPool(3);
        pool.submit(a);
        pool.submit(b);
        pool.submit(c);
        pool.shutdown();
    }
}

synchronized同步方法

public class Test_2 {
    private boolean fa = true, fb, fc;

    public synchronized void A() throws InterruptedException {
        while (!fa) {
            this.wait();
        }
        System.out.println(Thread.currentThread().getName() + " A");
        fa = false;
        fb = true;
        this.notifyAll();
    }

    public synchronized void B() throws InterruptedException {
        while (!fb) {
            this.wait();
        }
        System.out.println(Thread.currentThread().getName() + " B");
        fb = false;
        fc = true;
        this.notifyAll();
    }

    public synchronized void C() throws InterruptedException {
        while (!fc) {
            this.wait();
        }
        System.out.println(Thread.currentThread().getName() + " C");
        fc = false;
        fa = true;
        this.notifyAll();

    }

    public static void main(String[] args) {
        Test_2 test = new Test_2();
        Runnable a = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.A();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable b = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.B();
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Runnable c = new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10; i++) {
                        test.C();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        ExecutorService pool = Executors.newFixedThreadPool(3);
        pool.submit(a);
        pool.submit(b);
        pool.submit(c);
        pool.shutdown();
    }
}

7.7 synchronized与lock的区别:

非公平:谁抢到谁用,效率更高。

synchronized的实现方式:

原文:https://www.jianshu.com/p/c97227e592e1

ObjectMonitor源码 https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/d975dfffada6/src/share/vm/runtime/objectMonitor.hpp

https://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/d975dfffada6/src/share/vm/runtime/objectMonitor.cpp

monitor也是一个对象,在C++中由ObjectMonitor实现,其数据结构如下:

  ObjectMonitor() {
    _header       = NULL;
    _count        = 0;
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL;
    _WaitSet      = NULL;
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ;
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
  }

可以关注下几个比较关键的属性:

  • _owner 指向持有ObjectMonitor对象的线程
  • _WaitSet 存放处于wait状态的线程
  • _EntryList 存放处于等待锁block状态的线程队列
  • _recursions 锁的重入次数
  • _count 源码里有但是作用不大

多个线程在竞争共享数据执行到同步代码块时,会在_EntryList中排队,获得对象monitor的线程在进入_Owner区域时会将monitor_owner设为当前线程,同时_recursions加1。若持有mnitor对象的线程调用了wait()方法会释放monitor_ownernull,_recursions减一,进入到_WaitSet集合中等待被唤醒。

JDK1.5之前对ObjectMonitor进行操作比较耗费时间,效率比较低。

在JDK1.6之后,synchronized进行了优化出现了四种状态:

以前:

  • 无锁
  • 重量级锁:涉及用户态和内核态的切换

现在:

  • 无锁:没有线程执行到同步代码块
  • 偏向锁:只有一个线程执行
  • 轻量级锁:有两个线程执行,但是他们之间没有争抢,轻量级的争抢(涉及争抢,自旋等待会)。
  • 重量级锁:两个或者两个以上的线程出现重量级的争抢,效率低
  • 锁的状态只能升级,不能降级。

锁升级过程:https://www.jb51.net/article/183984.htm

优化策略:

  • 自旋锁
    • JDK 1.4 自旋10次,JDK1.6之后优化为自适应自旋锁。
  • 锁粗化
  • 锁清除:运行时发现当前线程不会逃逸当前线程,会进行锁清除。
public class Demo1 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                //锁是局部变量,没必要加锁
                //在运行时JVM会进行逃逸分析:
                //obj不会逃出当前线程,会执行锁消除操作
                Object object = new Object();
                synchronized (object) {
                    //
                }
            }
        };
        new Thread(runnable).start();
        new Thread(runnable).start();
        new Thread(runnable).start();
        new Thread(runnable).start();
    }
}

7.8 线程安全的集合

什么是线程安全,线程不安全

    public static void main(String[] args) {
        ArrayList<Object> list = new ArrayList<>();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    list.add(Thread.currentThread().getName() + "..." + i);
                }
            }
        };
        ExecutorService pool = Executors.newFixedThreadPool(5);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.submit(runnable);
        pool.shutdown();
        //拖时间用的
        while (!pool.isTerminated()) {
        }
        System.out.println(list.size());
    }

有些可能没有加进去。

如何让线程变安全

使用同步代码块保证集合安全

    public static void main(String[] args) {
        ArrayList<Object> list = new ArrayList<>();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                //1.使用同步代码块保证集合安全
                synchronized (list) {
                    for (int i = 0; i < 10; i++) {
                        list.add(Thread.currentThread().getName() + "..." + i);
                        System.out.println(list.toString());
                    }
                    System.out.println(list.size());
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
    }

Collections中的工具方法

JDK1.2提供,接口统一,维护性高,但性能没有提升,均以synchonized实现。

    public static void main(String[] args) {
        ArrayList<Object> list = new ArrayList<>();
        //2.Collections.synchronizedList()
        List<Object> list1 = Collections.synchronizedList(list);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                synchronized (list1) {
                    for (int i = 0; i < 10; i++) {
                        list1.add(Thread.currentThread().getName() + "..." + i);
                        System.out.println(list1.toString());
                    }
                    System.out.println(list1.size());
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
    }

7.9 JUC里面提供的新的集合以及使用、底层实现

CopyOnWriteArrayList

线程安全的ArrayList,加强版读写分离。

写有锁,读无锁,读写之间不阻塞,优于读写锁。

写入时,先copy一个容器副本,再添加新元素,最后替换引用。

使用方式与ArrayList无异。

    //CopyOnWriteArrayList
    public static void main(String[] args) {
        CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                synchronized (list) {
                    for (int i = 0; i < 10; i++) {
                        //添加 lock锁
                        list.add(Thread.currentThread().getName() + "..." + i);
                        System.out.println(list.toString());
                    }
                    System.out.println(list.size());
                    //读取无锁
                    //System.out.println(list.get(0));
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
    }

CopyOnWriteArraySet

线程安全的Set,底层使用CopyOnWriteArrayList实现。

唯一不同于使用addIdAbsent()添加元素,会遍历数组。

如存在元素,则不添加(扔掉副本)。

重复依据equals方法

//CopyOnWriteArraySet
    public static void main(String[] args) {
        CopyOnWriteArraySet<Object> list = new CopyOnWriteArraySet<>();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                synchronized (list) {
                    for (int i = 0; i < 10; i++) {
                        list.add(Thread.currentThread().getName() + "..." + i);
                        System.out.println(list.toString());
                    }
                    System.out.println(list.size());
                }
            }
        };
        for (int i = 0; i < 5; i++) {
            new Thread(runnable).start();
        }
    }

ConcurrnetHashMap

初始容量默认16段(Segment),使用分段锁设计

不对整个Map加锁,而是为每个Segment加锁。

当多个对象存入同一个Segment时,才需要互斥。

最理想状态为16个对象分别存入16个Segment,并行数量16.

使用方式与HashMap无异

JDK1.8改为CAS无锁算法。

7.10 CAS算法

CAS:Compare And Swap(比较交换算法)

  • 硬件实现,汇编指令(compxchg),是靠硬件来实现,效率高。
  • 并且比较和交换过程是同步
  • CAS是一种乐观锁

CAS比较交换算法,修改的方法包含三个核心参数(V,E,N)

V:需更新的变量、E:预期值、N:新值。

只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。

乐观锁:

  • 总是认为时线程安全,不怕别人的线程修改量,如果修改了再重新尝试,直到成功。
  • CAS是乐观锁

悲观锁:

  • 总是认为线程不安全,不管什么情况都进行加锁,要是获取锁失败,就阻塞。
  • synchronized是悲观锁。

模拟实现CAS算法

public class CasDemo {
    private int v;

    public synchronized int get() {
        return v;
    }

    public synchronized boolean comAndSwap(int e, int n) {
        if (e == v) {
            v = n;
            return true;
        }
        return false;
    }
}

class Test {
    public static void main(String[] args) {
        CasDemo demo = new CasDemo();
        ExecutorService es = Executors.newFixedThreadPool(100);
        Runnable ru = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int n = new Random().nextInt(100);
                    //先取旧值比较一下,有没有改变。
                    int e = demo.get();
                    boolean b = demo.comAndSwap(e, n);
                    System.out.println(Thread.currentThread().getName() + " " + b);
                    if (b) {
                        break;
                    }
                }
            }
        };
        for (int i = 0; i < 100; i++) {
            es.submit(ru);
        }
        es.shutdown();
    }
}

ABA问题

模拟解决办法:加版本号、修改次数

public class CasDemo_2 {
    private int v;
    private int version = 0;

    public synchronized int get() {
        return v;
    }

    public synchronized int getVersino() {
        return version;
    }

    public synchronized boolean comAndSwap(int e, int n, int ver) {
        if (e == v && ver == version) {
            v = n;
            version++;
            return true;
        }
        return false;
    }
}

class Test2 {
    public static void main(String[] args) {
        CasDemo_2 demo = new CasDemo_2();
        ExecutorService es = Executors.newFixedThreadPool(100);
        Runnable ru = new Runnable() {
            @Override
            public void run() {
                while (true) {
                    int n = new Random().nextInt(100);
                    int e = demo.get();
                    int ver = demo.getVersino();
                    boolean b = demo.comAndSwap(e, n, ver);
                    System.out.println(Thread.currentThread().getName() + " " + b);
                    if (b) {
                        break;
                    }
                }
            }
        };
        for (int i = 0; i < 100; i++) {
            es.submit(ru);
        }
        es.shutdown();
    }
}

7.11 Queue接口

ConcurrentLinkedQueue

  • 线程安全、可高效读写的队列,高并发下性能最好的队列
  • 无锁、CAS比较交换算法,修改的方法包含三个核心参数(V,E,N)
  • V:要更新的变量、E:预期值、N:新值。
  • 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。
public class QueueDemo {
    public static void main(String[] args) {
        ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for(int i=0;i<5;i++){
            es.submit(new Runnable() {
                @Override
                public void run() {
                    String s=Thread.currentThread().getName();
                    queue.offer(s);
                    System.out.println(s);
                }
            });
        }
        es.shutdown();
        while(!es.isTerminated()){}
        System.out.println("--------------");
        int count=queue.size();
        for (int i = 0; i < count; i++) {
            System.out.println(queue.poll());
        }
    }
}

BlockingQueue接口(阻塞队列)

Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法。

方法:

  • void put(E e)//将指定元素插入此队列中,如果没有可用空间,则等待。
  • E take()//获取并移除此队列头部元素,如果没有可用元素,则等待。

BlockingQueue两个实现类

BlockingQueue实现生产者和消费者

public class BolckingQueueDemo {

    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(6);
        //放入面包
        Runnable put = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "生产了" + i + "个面包");
                        queue.put("面包---" + i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }
        };
        //取出面包
        Runnable get = new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        queue.take();
                        System.out.println(Thread.currentThread().getName() + "取出了" + i + "个面包");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }
        };
        new Thread(put).start();
        new Thread(get).start();
    }
}

7.12 多线程的三个特性

多线程要保证并发线程执行,必须要保证三个特性:

  • 原子性(互斥性)
    • 一个或多个操作不能被分割,要么全部执行,要么就都不执行
  • 可见性
    • 多个线程访问同一个变量,一个线程修改了这个变量,别的线程能立即看到修改的值
    • volatile关键字保证内存可见性
  • 有序性
    • 程序执行的顺序按照代码的先后顺序执行。
    • 但处理器为了提高程序运行效率,可能会对输入代码进行优化,它不保证程序中各个语句的执行顺序和编写顺序—致。但最终结果是一致的。

可见性分析:

    //volatile 不稳定的,易变化的
    //1.保证内存可见性,效率低 见内存图示
    //2.禁止指令重排序
    //boolean flag = true;  子线程不停止
    volatile boolean flag = true;

    @Override
    public void run() {
        System.out.println("子线程开始执行");
        while (true) {
            if (!flag) {
                break;
            }
        }
        System.out.println("子线程结束");
    }
}

class Test {
    public static void main(String[] args) throws IOException {
        ThreadVisibility visibility = new ThreadVisibility();
        new Thread(visibility).start();
        System.in.read();
        visibility.flag = false;
        System.out.println("主线程结束..");
    }
}

问题描述:new出来的ThreadVisibility在堆中,初始值为true,子线程会把这个值进行缓存,当在主线程中把它修改为false时,由于CPU为了运行的效率,不让子线程看见,子线程死循环。

解决办法:变量类型前加volatile关键字,直接从堆里面取,进行判断,不要有缓存了。

有序性分析:

弊端:单线程没有问题,多线程会出现问题,指令会重排序。

问题描述:见图。

解决方式:变量类型前加volatile关键字,保证有序性。

synchronized 可以保证原子性和可见性,不能保证有序性

volatile 可以保证可见性和有序性,但不能保证原子性。

  • Lock接口间接借助了volatile关键字间接地实现了可见性和有序性

面试题

i++是原子操作吗? ,不是。

如果是原子操作,输出的数据不会出现重复的

i++包含了三步:(1)读取i (2)改 执行加1 (3)把结果写入

解决方法:

  • 1. i++; 加synchronized同步代码块,效率低
  • 2. Atomic 类处理 有返回值 看JDK
  • 3. 并发特别高得情况下:LongAdder类 用来计数 没有返回值
class PlusDemo {
    private int num = 0;

    public int getNum() {
        return num++;
    }
}
//1.
/*class PlusDemo {
    private int num = 0;

    public synchronized int getNum() {
        return num++;
    }
}*/
//2.
/*class PlusDemo {
    AtomicInteger atomicInteger = new AtomicInteger();
    public int getNum() {
        return atomicInteger.getAndIncrement();
    }
}*/
//3.
/*class PlusDemo {
    LongAdder longA = new LongAdder();
    public int getNum() {
    //非原子性的,调用另外一个方法返回
        longA.add(1);
        return (int) longA.sum();
    }
}*/

class TestAtomic {
    public static void main(String[] args) {
        PlusDemo demo = new PlusDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    int num = demo.getNum();
                    System.out.println("num = " + num);
                }
            }).start();
        }
    }
}

8.并发(同步)工具类

在Java1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier,Semaphore和Exchange。

8.1 CountDownLatch - 闭锁

CountDownLatch是一个同步计数器,初始化的时候传入需要计数的线程等待数。

作用:

  • 闭锁可以用来确保某些操作直到其他活动都完成才能继续执行。
  • 构造方法:传入计数的线程数
  • await():阻止线程执行
  • countDown():递减计数,如果计数到达零,则释放 所有等待的线程

案例1:计算所有子线程执行完毕所用时间

//计算线程的用时  -  解决方法:CountDownLatch
public class CountDowmLathchTest {
    public static void main(String[] args) throws InterruptedException {
        //1.创建闭锁
        CountDownLatch latch = new CountDownLatch(5);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 5; i++) {
            new MyThread(latch).start();
        }
        //3.
        latch.await();
        //必须等待所有的子线程执行完毕才能继续
        long end = System.currentTimeMillis();
        System.out.println("用时:" + (end - start));
    }
    //创建线程
    static class MyThread extends Thread {
        private CountDownLatch countDownLatch;

        public MyThread(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "开始执行");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("执行完毕了");
            //2.减一
            countDownLatch.countDown();
        }
    }
}

案例2:所有员工都到达,老板才开始开会

public class CountDowmLathchTest2 {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            new Employee(countDownLatch, 1000 * i).start();
        }
        new Boss(countDownLatch).start();
    }

    static class Boss extends Thread {
        private CountDownLatch countDownLatch;

        public Boss(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            System.out.println("老板要开会了");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("所有员工都到了,可以开会了");
        }
    }

    static class Employee extends Thread {
        private CountDownLatch countDownLatch;
        private int time;

        public Employee(CountDownLatch countDownLatch, int time) {
            this.countDownLatch = countDownLatch;
            this.time = time;
        }

        @Override
        public void run() {
            System.out.println("员工来了");
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            countDownLatch.countDown();
        }
    }
}

8.2 CyclicBarrier - 屏障

CyclicBarrier(屏障)表面意思理解为可循环使用的屏障。

作用:

  • 让一组线程在到达一个屏障时被阻塞,等到最后一个线程到达屏障点,才会运行被拦截的线程继续运行。
  • 构造函数:CyclicBarrier(int parties)屏障拦截的线程数量。
  • await():调用该方法时表示线程已经到达屏障,随即阻塞。

案例:实现多个线程同时执行

public class CyclicBarrierTest {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                //全部到达屏障输出
                System.out.println("全部准备好了");
            }
        });
        for (int i = 0; i < 5; i++) {
            new MyThread(cyclicBarrier).start();
        }
    }

    static class MyThread extends Thread {
        private CyclicBarrier cyclicBarrier;

        public MyThread(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + "准备好了");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "开始跑步了");
        }
    }
}

8.3 Semaphore - 信号量

Semaphore是synchronized 或Lock的加强版,作用是控制线程的并发数量。

作用:

  • 用来控制同时访问特定资源的线程数量,通过协调保证合理的使用公共资源。
  • 比作控制车流的红绿灯,如马路要控制流量,只限制5辆车通行,其他必须在路口处等待,不能行驶在马路上,当其中有5辆离开马路,那么允许后面5辆进入马路。
  • acquire():从信号量获取一个许可。
  • release():释放一个许可,将其返回给信号量。

案例:使用Semaphore信号量控制并发的个数

public class SemaphoreDemo {
//两个两个的线程同时执行
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(2);
        for (int i = 0; i <10 ; i++) {
            new MyThread(semaphore).start();
        }
    }
    static class MyThread extends  Thread{
        private Semaphore semaphore;
        public MyThread(Semaphore semaphore){
            this.semaphore=semaphore;
        }
        @Override
        public void run() {
            try {
                //获取许可
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"获取了序可:"+new Date().toLocaleString());
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName()+"-----释放了序可:"+new Date().toLocaleString());
                //释放许可
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

案例:实现三个线程循环打印A、B、C

public class SemaphoreDemo2 {

    public static void main(String[] args) {
        Alternative alternative=new Alternative();
        new Thread(new Runnable() {
            @Override
            public void run() {
                alternative.printA();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                alternative.printB();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                alternative.printC();
            }
        }).start();
    }
    static  class Alternative{
        private Semaphore semaphore1=new Semaphore(0);
        private Semaphore semaphore2=new Semaphore(0);
        public void printA(){
            System.out.println("A");
            semaphore1.release();
        }
        public void printB(){
            try {
                semaphore1.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("B");
            semaphore2.release();

        }
        public void printC(){
            try {
                semaphore2.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("C");
        }
    }

}

8.4 Exchanger - 交换器

Exchanger类似于一个交换器,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据,因此第一个线程的数据进入到第二个线程中,第二个线程的数据进入到第一个线程中。

案例:实现两个线程交换数据

public class ExchangeDemo {
    public static void main(String[] args) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        //创建交换器

        Exchanger<List<String>> exchanger=new Exchanger<>();
        //提交任务
        es.submit(new Runnable() {
            @Override
            public void run() {
//                String s1="hello";
                List<String> list1=new ArrayList<>();
                list1.add("苹果");
                list1.add("橘子");
                try {
                    list1=exchanger.exchange(list1);
                    System.out.println("线程1:"+list1.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        es.submit(new Runnable() {
            @Override
            public void run() {
                //String s2="world";
                List<String> list2=new ArrayList<>();
                list2.add("北京");
                list2.add("上海");
                try {
                    list2=exchanger.exchange(list2);
                    System.out.println("线程2:"+list2.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        es.shutdown();

    }
}

[/wppay]