线程锁

线程锁
  1. 三个count++性能对比
package com.mashibing.juc.c_018_00_AtomicXXX;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class T02_AtomicVsSyncVsLongAdder {
    static long count2 = 0L;
    static AtomicLong count1 = new AtomicLong(0L);
    static LongAdder count3 = new LongAdder();
    public static void main(String[] args) throws Exception {
        Thread[] threads = new Thread[1000];
        for(int i=0; i<threads.length; i++) {
            threads[i] =
                    new Thread(()-> {
                        for(int k=0; k<100000; k++) count1.incrementAndGet();
                    });
        }
        long start = System.currentTimeMillis();
        for(Thread t : threads ) t.start();
        for (Thread t : threads) t.join();
        long end = System.currentTimeMillis();
        //TimeUnit.SECONDS.sleep(10);
        System.out.println("Atomic: " + count1.get() + " time " + (end-start));
        //-----------------------------------------------------------
        Object lock = new Object();
        for(int i=0; i<threads.length; i++) {
            threads[i] =
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int k = 0; k < 100000; k++)
                            synchronized (lock) {
                                count2++;
                            }
                    }
                });
        }
        start = System.currentTimeMillis();
        for(Thread t : threads ) t.start();
        for (Thread t : threads) t.join();
        end = System.currentTimeMillis();
        System.out.println("Sync: " + count2 + " time " + (end-start));
        //----------------------------------
        for(int i=0; i<threads.length; i++) {
            threads[i] =
                    new Thread(()-> {
                        for(int k=0; k<100000; k++) count3.increment();
                    });
        }
        start = System.currentTimeMillis();
        for(Thread t : threads ) t.start();
        for (Thread t : threads) t.join();
        end = System.currentTimeMillis();
        //TimeUnit.SECONDS.sleep(10);
        System.out.println("LongAdder: " + count1.longValue() + " time " + (end-start));
    }
    static void microSleep(int m) {
        try {
            TimeUnit.MICROSECONDS.sleep(m);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
线程锁
根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。
也就是说在并发量比较小的情况下LongAdder与AtomicLong性能差不了多少。
LongAdder的实现原理:详细可参考https://segmentfault.com/a/1190000015865714
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
这种做法有没有似曾相识的感觉?没错,ConcurrentHashMap中的“分段锁”其实就是类似的思路。
线程锁
 
2ReentrantLock可重入锁(synchronized就是重入锁的一种)
线程锁
/**
 * reentrantlock用于替代synchronized
 * 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
 * 这里是复习synchronized最原始的语义
 *
 * 使用reentrantlock可以完成同样的功能
 * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
 * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
 * @author mashibing
 */
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class T02_ReentrantLock2 {
         Lock lock = new ReentrantLock();
         void m1() {
                  try {
                          lock.lock(); //synchronized(this)
                          for (int i = 0; i < 10; i++) {
                                   TimeUnit.SECONDS.sleep(1);
                                   System.out.println(i);
                          }
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  } finally {
                          lock.unlock();
                  }
         }
         void m2() {
                  try {
                          lock.lock();
                          System.out.println("m2 ...");
                  } finally {
                          lock.unlock();
                  }
         }
         public static void main(String[] args) {
                  T02_ReentrantLock2 rl = new T02_ReentrantLock2();
                  new Thread(rl::m1).start();
                  try {
                          TimeUnit.SECONDS.sleep(1);
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  }
                  new Thread(rl::m2).start();
         }
}
线程锁
限时等待
tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。我们可以将这种方法用来解决死锁问题。
/**
 * reentrantlock用于替代synchronized
 * 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
 * 这里是复习synchronized最原始的语义
 *
 * 使用reentrantlock可以完成同样的功能
 * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
 * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
 *
 * 使用reentrantlock可以进行尝试锁定tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
 * @author mashibing
 */
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class T03_ReentrantLock3 {
         Lock lock = new ReentrantLock();
         void m1() {
                  try {
                          lock.lock();
                          for (int i = 0; i < 3; i++) {
                                   TimeUnit.SECONDS.sleep(1);
                                   System.out.println(i);
                          }
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  } finally {
                          lock.unlock();
                  }
         }
      /**
 * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
 * 可以根据tryLock的返回值来判定是否锁定
 * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
 */
         void m2() {
                  /*
                  boolean locked = lock.tryLock();
                  System.out.println("m2 ..." + locked);
                  if(locked) lock.unlock();
                  */
                  
                  boolean locked = false;
                  
                  try {
                          locked = lock.tryLock(5, TimeUnit.SECONDS);
                          System.out.println("m2 ..." + locked);
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  } finally {
                          if(locked) lock.unlock();
                  }
                  
         }
         public static void main(String[] args) {
                  T03_ReentrantLock3 rl = new T03_ReentrantLock3();
                  new Thread(rl::m1).start();
                  try {
                          TimeUnit.SECONDS.sleep(1);
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  }
                  new Thread(rl::m2).start();
         }
}
线程锁
响应中断
响应中断就是一个线程获取不到锁,不会傻傻的一直等下去,ReentrantLock会给予一个中断回应。
/**
 * reentrantlock用于替代synchronized
 * 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
 * 这里是复习synchronized最原始的语义
 *
 * 使用reentrantlock可以完成同样的功能
 * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
 * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
 *
 * 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
 *
 * 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
 * 在一个线程等待锁的过程中,可以被打断
 *
 * @author mashibing
 */
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
public class T04_ReentrantLock4 {
                  
         public static void main(String[] args) {
                  Lock lock = new ReentrantLock();
                  
                  
                  Thread t1 = new Thread(()->{
                          try {
                                   lock.lock();
                                   System.out.println("t1 start");
                                   TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
                                   System.out.println("t1 end");
                          } catch (InterruptedException e) {
                                   System.out.println("interrupted!");
                          } finally {
                                   lock.unlock();
                          }
                  });
                  t1.start();
                  
                  Thread t2 = new Thread(()->{
                          try {
                                   //lock.lock();
                                   lock.lockInterruptibly(); //可以对interrupt()方法做出响应
                                   System.out.println("t2 start");
                                   TimeUnit.SECONDS.sleep(5);
                                   System.out.println("t2 end");
                          } catch (InterruptedException e) {
                                   System.out.println("interrupted!");
                          } finally {
                                   lock.unlock();
                          }
                  });
                  t2.start();
                  
                  try {
                          TimeUnit.SECONDS.sleep(1);
                  } catch (InterruptedException e) {
                          e.printStackTrace();
                  }
                  t2.interrupt(); //打断线程2的等待
                  
         }
}
线程锁
公平锁
在公平的锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中(此时和公平锁是一样的)。所以,它们的差别在于非公平锁会有更多的机会去抢占锁。
/**
 * reentrantlock用于替代synchronized
 * 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
 * 这里是复习synchronized最原始的语义
 *
 * 使用reentrantlock可以完成同样的功能
 * 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
 * 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
 *
 * 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
 *
 * 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
 * 在一个线程等待锁的过程中,可以被打断
 *
 * ReentrantLock还可以指定为公平锁
 *
 * @author mashibing
 */
package com.mashibing.juc.c_020;
 
import java.util.concurrent.locks.ReentrantLock;
 
public class T05_ReentrantLock5 extends Thread {
 
private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果
    public void run() {
        for(int i=0; i<100; i++) {
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"获得锁");
            }finally{
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        T05_ReentrantLock5 rl=new T05_ReentrantLock5();
        Thread th1=new Thread(rl);
        Thread th2=new Thread(rl);
        th1.start();
        th2.start();
    }
}
在公平锁中会挨个挨个输出,而在非公平锁中则会随意输出。
线程锁
2 CountDownLatch
·  countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
·  是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作。
package com.mashibing.juc.c_020;
import java.util.concurrent.CountDownLatch;
public class T06_TestCountDownLatch {
    public static void main(String[] args) {
        usingJoin();
        usingCountDownLatch();
    }
    private static void usingCountDownLatch() {
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);
        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
                latch.countDown();
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end latch");
    }
    private static void usingJoin() {
        Thread[] threads = new Thread[100];
        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
            });
        }
        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("end join");
    }
}
 
3CyclicBarrier
线程锁
从字面上的意思可以知道,这个类的中文意思是循环栅栏。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。
package com.mashibing.juc.c_020;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class T07_TestCyclicBarrier {
    public static void main(String[] args) {
        //CyclicBarrier barrier = new CyclicBarrier(20);
        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
        /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
            @Override
            public void run() {
                System.out.println("满人,发车");
            }
        });*/
        for(int i=0; i<100; i++) {
                new Thread(()->{
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
           
        }
    }
}
 
常用方法:
public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
线程调用 await() 表示自己已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
区别:
·  CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
·  CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
 
 
4phaser锁
phase 是阶段的意思。Phaser是按照不同阶段执行线程的,就像是结合了CountDownLatch和CyclicBarrier,它本身维护着一个叫 phase 的成员变量代表当前执行的阶段。
package com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class T08_TestPhaser {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();
    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        phaser.bulkRegister(5);
        for(int i=0; i<5; i++) {
            final int nameIndex = i;
            new Thread(()->{
                Person p = new Person("person " + nameIndex);
                p.arrive();
                phaser.arriveAndAwaitAdvance();
                p.eat();
                phaser.arriveAndAwaitAdvance();
                p.leave();
                phaser.arriveAndAwaitAdvance();
            }).start();
        }
    }
    static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("所有人到齐了!");
                    return false;
                case 1:
                    System.out.println("所有人吃完了!");
                    return false;
                case 2:
                    System.out.println("所有人离开了!");
                    System.out.println("婚礼结束!");
                    return true;
                default:
                    return true;
            }
        }
    }
    static class Person {
        String name;
        public Person(String name) {
            this.name = name;
        }
        public void arrive() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达现场!\n", name);
        }
        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃完!\n", name);
        }
        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开!\n", name);
        }
    }
}
线程锁
4ReadWriteLock
编程想要实现的最好效果是,可以做到读和读互不影响,读和写互斥,写和写互斥,提高读写的效率
ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。
Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
·  非公平模式(默认)
当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
·  公平模式
当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
package com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class T10_TestReadWriteLock {
    static Lock lock = new ReentrantLock();
    private static int value;
    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();
    public static void read(Lock lock) {
        try {
            lock.lock();
            Thread.sleep(1000);
            System.out.println("read over!");
            //模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public static void write(Lock lock, int v) {
        try {
            lock.lock();
            Thread.sleep(1000);
            value = v;
            System.out.println("write over!");
            //模拟写操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public static void main(String[] args) {
        //Runnable readR = ()-> read(lock);
        Runnable readR = ()-> read(readLock);
        //Runnable writeR = ()->write(lock, new Random().nextInt());
        Runnable writeR = ()->write(writeLock, new Random().nextInt());
        for(int i=0; i<18; i++) new Thread(readR).start();
        for(int i=0; i<2; i++) new Thread(writeR).start();
    }
}
 
 
5semapore
   Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
package com.mashibing.juc.c_020;
import java.util.concurrent.Semaphore;
public class T11_TestSemaphore {
    public static void main(String[] args) {
        //Semaphore s = new Semaphore(2);
        Semaphore s = new Semaphore(2, true);
        //允许一个线程同时执行
        //Semaphore s = new Semaphore(1);
        new Thread(()->{
            try {
                s.acquire();
                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();
        new Thread(()->{
            try {
                s.acquire();
                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
 
方法 acquire( int permits ) 参数作用,及动态添加 permits 许可数量  
  acquire( int permits ) 中的参数是什么意思呢?可以这么理解, new Semaphore(6) 表示初始化了 6个通路, semaphore.acquire(2) 表示每次线程进入将会占用2个通路,semaphore.release(2) 运行时表示归还2个通路。没有通路,则线程就无法进入代码块。
  而上面的代码中,semaphore.acquire() +  semaphore.release()  在运行的时候,其实和 semaphore.acquire(1) + semaphore.release(1)  效果是一样的。 
6Exchanger
线程锁
package juc.c_020;
 
import java.util.concurrent.Exchanger;
 
public class T12_TestExchanger {
 
    static Exchanger<String> exchanger = new Exchanger<>();
 
    public static void main(String[] args) {
        new Thread(()->{
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
 
        }, "t1").start();
 
 
        new Thread(()->{
            String s = "T2";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);
 
        }, "t2").start();
 
 
    }
}