Java多线程问题--wait/notifyAll多生产者和多消费者
本文内容大部分引自《Java多线程编程核心技术》,感谢作者!!!
代码地址:https://github.com/xianzhixianzhixian/thread.git
相关知识讲解
1、wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码处停止执行,知道接到通知或被中断位置。在wait()方法调用前,线程必须获得该对象的对象级别锁(注意,是对象级别锁),即只能在同步方法或同步块中调用wait()方法。在执行wait()方法后,当前线程释放锁。在从wait()返回前,线程与其它线程竞争重新获得锁。使用方法:lock.wait()
2、方法notify()也要在同步方法或同步代码块中调用,在调用前线程也必须获得该对象的对象锁。该方法用来通知那些可能在等待该对象的对象锁的其它线程,如果有多个线程等待则由线程规划器随机挑选一个呈wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。在执行notify()方法后,当前线程不会马上释放该对象锁,呈wait状态的线程也不能马上获取该对象锁,要等到执notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁,呈wait状态所在的线程才可以获取该对象锁。使用方法:lock.notify()
3、notify()和notifyAll()的区别:当有多个线程同时运行时,如果调用notify()方法,则会由线程调度器在所有处于wait状态的线程中随机唤醒一个线程所以notify()方法很有可能唤醒的是同类线程,造成多生产者/多消费者线程的假死现象;调用notifyAll()方法时,会唤醒所有处于wait状态的线程。这些被唤醒的线程竞争对象锁,获得锁资源的线程则会继续运行下去。
多生产者/多消费者操作值代码实现
ValueObject.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:25
*/
public class VauleObject {
public static String value = "";
}
C.java
/**
* 多生产者对应多消费者操作值
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:28
*/
public class C {
private Object lock;
public C(Object lock) {
this.lock = lock;
}
public void getValue(){
try {
synchronized (lock) {
//if (VauleObject.value.equals("")){ //这里一定不能用if
while (VauleObject.value.equals("")){
lock.wait();
}
System.out.println(Thread.currentThread().getName()+" get value="+ VauleObject.value);
VauleObject.value = "";
lock.notifyAll();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
P.java
/**
* 多生产者对应多消费者操作值
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:23
*/
public class P {
private Object lock;
public P(Object lock) {
this.lock = lock;
}
public void setValue(){
try {
synchronized (lock){
//if (!VauleObject.value.equals("")){ //这里一定不能用if
while (!VauleObject.value.equals("")){
lock.wait();
}
String value = System.currentTimeMillis()+"_"+System.nanoTime();
System.out.println(Thread.currentThread().getName()+" set的值是"+value);
VauleObject.value = value;
lock.notifyAll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
ThreadC.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:33
*/
public class ThreadC extends Thread {
private C c;
public ThreadC(C c) {
this.c = c;
}
@Override
public void run() {
super.run();
while (true){
c.getValue();
}
}
}
ThreadP.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:33
*/
public class ThreadP extends Thread {
private P p;
public ThreadP(P p) {
this.p = p;
}
@Override
public void run() {
super.run();
while (true){
p.setValue();
}
}
}
Run.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-28 21:35
*/
public class Run {
public static void main(String[] args) {
Object object = new Object();
P p = new P(object);
C c = new C(object);
ThreadP[] threadP = new ThreadP[2];
ThreadC[] threadC = new ThreadC[2];
for (int i = 0; i < 2; i++) {
threadP[i] = new ThreadP(p);
threadP[i].setName("生产者"+i);
threadC[i] = new ThreadC(c);
threadC[i].setName("消费者"+i);
threadP[i].start();
threadC[i].start();
}
}
}
运行结果:可以看到生产者线程写值和消费者线程读值有序进行
多生产者/多消费者操作栈代码实现
MyStack.java
/**
* 多生产者多消费者操作栈
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:39
*/
public class MyStack {
private List<String> list = new ArrayList<>();
synchronized public void push(){
try {
//if (list.size() == 1){ //这里一定要用while不能用if
while (list.size() == 1){
this.wait();
}
list.add("anyString="+Math.random());
//这里由于是一个生产者,所以可以用notify()函数,因为只会唤醒消费者线程
//如果是多消费者,则应使用notifyAll()函数,因为notify()函数可能会唤醒同类生产者线程造成假死
this.notifyAll();
System.out.println(" push="+list.size());
} catch (Exception e) {
e.printStackTrace();
}
}
synchronized public String pop(){
String returnValue = "";
try {
//if (list.size() == 0){ //这里一定要用while不能用if
while (list.size() == 0){
System.out.println("pop操作中的:"+Thread.currentThread().getName()+" 线程呈wait状态");
this.wait();
}
System.out.println("pop操作中的:"+Thread.currentThread().getName()+" "+System.nanoTime()+" 正在pop");
returnValue += list.get(0);
list.remove(0);
//这里是多生产者,应使用notifyAll()函数,因为notify()函数可能会唤醒同类消费者线程造成假死
this.notifyAll();
System.out.println("pop="+list.size());
System.out.println("pop操作中的:"+Thread.currentThread().getName()+" "+System.nanoTime()+" 完成pop");
} catch (Exception e) {
e.printStackTrace();
}
return returnValue;
}
}
C.java
/**
* wait/notifyAll多生产者对应多消费者操作栈
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:47
*/
public class C {
private MyStack myStack;
public C(MyStack myStack) {
this.myStack = myStack;
}
public void popService(){
System.out.println("pop="+myStack.pop());
}
}
P.java
/**
* wait/notifyAll多生产者对应多消费者操作栈
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:46
*/
public class P {
private MyStack myStack;
public P(MyStack myStack) {
this.myStack = myStack;
}
public void pushService(){
myStack.push();
}
}
CThread.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:50
*/
public class CThread extends Thread {
private C c;
public CThread(C c) {
this.c = c;
}
@Override
public void run() {
super.run();
while (true){
c.popService();
}
}
}
PThread.java
/**
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:45
*/
public class PThread extends Thread {
private P p;
public PThread(P p) {
this.p = p;
}
@Override
public void run() {
super.run();
while (true){
p.pushService();
}
}
}
/**
* @author: xianzhixianzhixian
* @date: 2018-12-27 22:51
*/
public class Run {
public static void main(String[] args) {
MyStack myStack = new MyStack();
P p = new P(myStack);
C c = new C(myStack);
PThread[] pThread = new PThread[5];
CThread[] cThread = new CThread[5];
for (int i = 0; i < 5; i++) {
pThread[i] = new PThread(p);
pThread[i].setName("线程p"+i);
cThread[i] = new CThread(c);
cThread[i].setName("线程c"+i);
pThread[i].start();
cThread[i].start();
}
}
}
运行截图:可以看到栈的长度始终小于等于1,并且生产者线程插入值和消费者读取值有序进行