线程锁
线程锁
-
三个count++性能对比
package com.mashibing.juc.c_018_00_AtomicXXX;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
public class T02_AtomicVsSyncVsLongAdder {
static long count2 = 0L;
static AtomicLong count1 = new AtomicLong(0L);
static LongAdder count3 = new LongAdder();
public static void main(String[] args) throws Exception {
Thread[] threads = new Thread[1000];
for(int i=0; i<threads.length; i++) {
threads[i] =
new Thread(()-> {
for(int k=0; k<100000; k++) count1.incrementAndGet();
});
}
long start = System.currentTimeMillis();
for(Thread t : threads ) t.start();
for (Thread t : threads) t.join();
long end = System.currentTimeMillis();
//TimeUnit.SECONDS.sleep(10);
System.out.println("Atomic: " + count1.get() + " time " + (end-start));
//-----------------------------------------------------------
Object lock = new Object();
for(int i=0; i<threads.length; i++) {
threads[i] =
new Thread(new Runnable() {
@Override
public void run() {
for (int k = 0; k < 100000; k++)
synchronized (lock) {
count2++;
}
}
});
}
start = System.currentTimeMillis();
for(Thread t : threads ) t.start();
for (Thread t : threads) t.join();
end = System.currentTimeMillis();
System.out.println("Sync: " + count2 + " time " + (end-start));
//----------------------------------
for(int i=0; i<threads.length; i++) {
threads[i] =
new Thread(()-> {
for(int k=0; k<100000; k++) count3.increment();
});
}
start = System.currentTimeMillis();
for(Thread t : threads ) t.start();
for (Thread t : threads) t.join();
end = System.currentTimeMillis();
//TimeUnit.SECONDS.sleep(10);
System.out.println("LongAdder: " + count1.longValue() + " time " + (end-start));
}
static void microSleep(int m) {
try {
TimeUnit.MICROSECONDS.sleep(m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
根据Oracle官方文档的介绍,LongAdder在高并发的场景下会比它的前辈————AtomicLong 具有更好的性能,代价是消耗更多的内存空间。在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时AtomicLong的自旋会成为瓶颈。
这就是LongAdder引入的初衷——解决高并发环境下AtomicLong的自旋瓶颈问题。
也就是说在并发量比较小的情况下LongAdder与AtomicLong性能差不了多少。
LongAdder的实现原理:详细可参考https://segmentfault.com/a/1190000015865714
LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。
|
2ReentrantLock可重入锁(synchronized就是重入锁的一种)
/**
* reentrantlock用于替代synchronized
* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
* 这里是复习synchronized最原始的语义
*
* 使用reentrantlock可以完成同样的功能
* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
* @author mashibing
*/
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class T02_ReentrantLock2 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
void m2() {
try {
lock.lock();
System.out.println("m2 ...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
T02_ReentrantLock2 rl = new T02_ReentrantLock2();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
|
限时等待
tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。我们可以将这种方法用来解决死锁问题。
/**
* reentrantlock用于替代synchronized
* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
* 这里是复习synchronized最原始的语义
*
* 使用reentrantlock可以完成同样的功能
* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
*
* 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
* @author mashibing
*/
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class T03_ReentrantLock3 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock();
for (int i = 0; i < 3; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行
* 可以根据tryLock的返回值来判定是否锁定
* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中
*/
void m2() {
/*
boolean locked = lock.tryLock();
System.out.println("m2 ..." + locked);
if(locked) lock.unlock();
*/
boolean locked = false;
try {
locked = lock.tryLock(5, TimeUnit.SECONDS);
System.out.println("m2 ..." + locked);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(locked) lock.unlock();
}
}
public static void main(String[] args) {
T03_ReentrantLock3 rl = new T03_ReentrantLock3();
new Thread(rl::m1).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(rl::m2).start();
}
}
|
响应中断
响应中断就是一个线程获取不到锁,不会傻傻的一直等下去,ReentrantLock会给予一个中断回应。
/**
* reentrantlock用于替代synchronized
* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
* 这里是复习synchronized最原始的语义
*
* 使用reentrantlock可以完成同样的功能
* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
*
* 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
*
* 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
* 在一个线程等待锁的过程中,可以被打断
*
* @author mashibing
*/
package com.mashibing.juc.c_020;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
public class T04_ReentrantLock4 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(()->{
try {
lock.lock();
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}
});
t1.start();
Thread t2 = new Thread(()->{
try {
//lock.lock();
lock.lockInterruptibly(); //可以对interrupt()方法做出响应
System.out.println("t2 start");
TimeUnit.SECONDS.sleep(5);
System.out.println("t2 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.interrupt(); //打断线程2的等待
}
}
|
公平锁
在公平的锁中,如果有另一个线程持有锁或者有其他线程在等待队列中等待这个所,那么新发出的请求的线程将被放入到队列中。而非公平锁上,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中(此时和公平锁是一样的)。所以,它们的差别在于非公平锁会有更多的机会去抢占锁。
/**
* reentrantlock用于替代synchronized
* 由于m1锁定this,只有m1执行完毕的时候,m2才能执行
* 这里是复习synchronized最原始的语义
*
* 使用reentrantlock可以完成同样的功能
* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)
* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放
*
* 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待
*
* 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,
* 在一个线程等待锁的过程中,可以被打断
*
* ReentrantLock还可以指定为公平锁
*
* @author mashibing
*/
package com.mashibing.juc.c_020;
import java.util.concurrent.locks.ReentrantLock;
public class T05_ReentrantLock5 extends Thread {
private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果
public void run() {
for(int i=0; i<100; i++) {
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"获得锁");
}finally{
lock.unlock();
}
}
}
public static void main(String[] args) {
T05_ReentrantLock5 rl=new T05_ReentrantLock5();
Thread th1=new Thread(rl);
Thread th2=new Thread(rl);
th1.start();
th2.start();
}
}
在公平锁中会挨个挨个输出,而在非公平锁中则会随意输出。
|
2 CountDownLatch
· countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
· 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作。
package com.mashibing.juc.c_020;
import java.util.concurrent.CountDownLatch;
public class T06_TestCountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}
|
3CyclicBarrier
从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是 CyclicBarrier。
package com.mashibing.juc.c_020;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class T07_TestCyclicBarrier {
public static void main(String[] args) {
//CyclicBarrier barrier = new CyclicBarrier(20);
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));
/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("满人,发车");
}
});*/
for(int i=0; i<100; i++) {
new Thread(()->{
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
|
常用方法:
public int await() throws InterruptedException, BrokenBarrierExceptionpublic int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
线程调用 await() 表示自己已经到达栅栏
BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
区别:
· CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
· CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
4phaser锁
phase 是阶段的意思。Phaser是按照不同阶段执行线程的,就像是结合了CountDownLatch和CyclicBarrier,它本身维护着一个叫 phase 的成员变量代表当前执行的阶段。
package com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class T08_TestPhaser {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
phaser.bulkRegister(5);
for(int i=0; i<5; i++) {
final int nameIndex = i;
new Thread(()->{
Person p = new Person("person " + nameIndex);
p.arrive();
phaser.arriveAndAwaitAdvance();
p.eat();
phaser.arriveAndAwaitAdvance();
p.leave();
phaser.arriveAndAwaitAdvance();
}).start();
}
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("所有人到齐了!");
return false;
case 1:
System.out.println("所有人吃完了!");
return false;
case 2:
System.out.println("所有人离开了!");
System.out.println("婚礼结束!");
return true;
default:
return true;
}
}
}
static class Person {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达现场!\n", name);
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃完!\n", name);
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开!\n", name);
}
}
}
|
4ReadWriteLock
编程想要实现的最好效果是,可以做到读和读互不影响,读和写互斥,写和写互斥,提高读写的效率
ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。
Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性。
· 非公平模式(默认)
当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
· 公平模式
当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。
package com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);
//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for(int i=0; i<18; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
}
}
|
5semapore
Semaphore 是 synchronized 的加强版,作用是控制线程的并发数量。就这一点而言,单纯的synchronized 关键字是实现不了的。
package com.mashibing.juc.c_020;
import java.util.concurrent.Semaphore;
public class T11_TestSemaphore {
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);
new Thread(()->{
try {
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();
new Thread(()->{
try {
s.acquire();
System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
|
方法 acquire( int permits ) 参数作用,及动态添加 permits 许可数量
acquire( int permits ) 中的参数是什么意思呢?可以这么理解, new Semaphore(6) 表示初始化了 6个通路, semaphore.acquire(2) 表示每次线程进入将会占用2个通路,semaphore.release(2) 运行时表示归还2个通路。没有通路,则线程就无法进入代码块。
而上面的代码中,semaphore.acquire() + semaphore.release() 在运行的时候,其实和 semaphore.acquire(1) + semaphore.release(1) 效果是一样的。
6Exchanger
package juc.c_020;
import java.util.concurrent.Exchanger;
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}
|