*架构师学习——第二阶段:实战Java高并发程序设计
1、什么是并行?
并行处理(ParallelProcessing)是计算机系统中能同时执行两个或更多个处理机的一种计算方法。处理机可同时工作于同一程序的不同方面。并行处理的主要目的是节省大型和复杂问题的解决时间。
2、为什么需要并行?
并行计算只有在 图像处理 和 服务端编程 两个领域可以使用,并且它在这2个领域确实有着大量广泛的使用。但是在其它任何地方,并行计算毫无建树!
摩尔定律的失效。10年过去了,我们还停留在4GHZ。
并行计算还出于业务模型的需要。并不是为了提高系统性能,而是确实在业务上需要多个执行单元。比如HTTP服务器,为每一个Socket连接新建一个处理线程,让不同线程承担不同的业务工作,简化任务调度。
3、有几个重要的概念
同步(synchronous)和异步(asynchronous):方法调用时间的区别
并发(Concurrency)和并行(Parallelism)
临界区 :临界区用来表示一种公共资源或者说是共享数据,可以被多个线程使用。但是每一次,只能有一个线程
使用它,一旦临界区资源被占用,其他线程要想使用这个资源,就必须等待。
阻塞(Blocking)和非阻塞(Non-Blocking):阻塞和非阻塞通常用来形容多线程间的相互影响。比如一个线程占用了临界区资源,那么其它所有需要这个资源的线程就必须在这个临界区中进行等待,等待会导致线程挂起。这种情况就是阻塞。此时,如
果占用资源的线程一直不愿意释放资源,那么其它所有阻塞在这个临界区上的线程都不能工作。非阻塞允许多个线程同时进入临界区。
死锁(Deadlock)、饥饿(Starvation)和活锁(Livelock):死锁是由于多个进程相互请求导致无法满足资源需求;饥饿指线程长时间无法得到需要的资源,无法继续执行;活锁指在一定时间之后能够满足资源需要从而线程能够继续向下执行。
并发级别 :阻塞、无障碍、无锁、无等待(后三个为非阻塞)
4、两个定律
Amdahl定律:定义了串行系统并行化后的加速比的计算公式和理论上限;加速比定义:加速比=优化前系统耗时/优化后系统耗时。公式:Tn=T1(F+1/n*(1-F)),其中Tn为优化后耗时,T1为单核时耗时,F为串行比例,n为处理器个数。
Gustafson定律:说明处理器个数,串行比例和加速比之间的关系,只要有足够的并行化,那么加速比和CPU个数成正比。公式:S=n-F(n-1),其中S为加速比,n为处理器个数,F为串行比例。
5、线程相关
线程是进程内的执行单元。
Thread t1 = new Thread();// 新建线程
t1.start();// 启动线程
// Thread.stop();// 不建议使用,它会释放所有的monitor。
t1.interrupt(); // 中断线程
// t1.isInterrupted(); // 判断线程是否被中断
t1.interrupted(); // 判断线程是否被中断,并清除中断状态
// 挂起suspend和继续执行resume线程,如果加锁发生在resume()之前则产生死锁
// 等待线程结束join
// 让出当前占有资源,但还会继续竞争yield
// 守护进程:在后台默默地完成一些系统性的服务,比如垃圾回收线程、JIT线程就可以理解为守护线程,当一个Java应用内,只有守护线程时,Java虚拟机就会自然退出,必须在启动线程前设置为守护进程,否则报错
Thread t = new DaemonT();
t.setDaemon(true);
t.start();
// 线程优先级,高优先级线程更容易获得资源,但并不是绝对的
Thread high=new HightPriority();
LowPriority low=new LowPriority();
high.setPriority(Thread.MAX_PRIORITY);
low.setPriority(Thread.MIN_PRIORITY);
low.start();
high.start();
/* 基础的线程同步,使用synchronized。
指定加锁对象:对给定对象加锁,进入同步代码前要获得给定对象的锁。
public void run() {
for(int j=0;j<10000000;j++){
synchronized(instance){
i++;
}
}
}
直接作用于实例方法:相当于对当前实例加锁,进入同步代码前要获得当前实例的锁。
public synchronized void increase(){
i++;
}
直接作用于静态方法:相当于对当前类加锁,进入同步代码前要获得当前类的锁。
public static synchronized void increase(){
i++;
}
此外还有Object.wait(),Object.notify()等操作实现线程同步
public static class T2 extends Thread{
public void run()
{
synchronized (object) {
System.out.println(System.currentTimeMillis()
+":T2 start! notify one thread");
object.notify();
System.out.println(System.currentTimeMillis()+":T2 end!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
}
public static class T1 extends Thread{
public void run()
{
synchronized (object) {
System.out.println(System.currentTimeMillis()+":T1 start! ");
try {
System.out.println(System.currentTimeMillis()
+":T1 wait for object ");
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis()+":T1 end!");
}
}
}*/
6、内存模型和线程安全
原子性是指一个操作是不可中断的。即使是在多个线程一起执行的时候,一个操作一旦开始,就
不会被其它线程干扰。
有序性是指一条指令的执行分为很多步骤,按照一定的顺序向下执行。
可见性是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道这个修改。
public class VisibilityTest extends Thread {
private boolean stop;
public void run() {
int i = 0;
while(!stop) {
i++;
}
System.out.println("finish loop,i=" + i);
}
public void stopIt() {
stop = true;
}
public boolean getStop(){
return stop;
}
public static void main(String[] args) throws Exception {
VisibilityTest v = new VisibilityTest();
v.start();
Thread.sleep(1000);
v.stopIt();
Thread.sleep(2000);
System.out.println("finish main");
System.out.println(v.getStop());
}
}
Happen-Before原则
线程安全 指某个函数、函数库在多线程环境中被调用时,能够正确地处理各个线程的局部变量,使程序功能正确完成。
public class AccountingSync implements Runnable{
static AccountingSync instance=new AccountingSync();
static int i=0;
@Override
public void run() {
for(int j=0;j<10000000;j++){
synchronized(instance){
i++;
}
}
}
}
7、无锁
CAS:Compare and Swap
CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即时没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
无锁类的使用
AtomicInteger:Number
主要接口:
public final int get() //取得当前值
public final void set(int newValue) //设置当前值
public final int getAndSet(int newValue) //设置新值,并返回旧值值
public final boolean compareAndSet(int expect, int u)//如果当前值为expect,则设置为u
public final int getAndIncrement() //当前值加1,返回旧值
public final int getAndDecrement() //当前值减1,返回旧值
public final int getAndAdd(int delta) //当前值增加delta,返回旧值
public final int incrementAndGet() //当前值加1,返回新值
public final int decrementAndGet() //当前值减1,返回新值
public final int addAndGet(int delta) //当前值增加delta,返回新值
Unsafe:非安全的操作,比如:根据偏移量设置值;park();底层的CAS操作;非公开API,在不同版本的JDK中, 可能有较大差异
主要接口:
//获得给定对象偏移量上的int值
public native int getInt(Object o, long offset);
//设置给定对象偏移量上的int值
public native void putInt(Object o, long offset, int x);
//获得字段在对象中的偏移量
public native long objectFieldOffset(Field f);
//设置给定对象的int值,使用volatile语义
public native void putIntVolatile(Object o, long offset, int x);
//获得给定对象对象的int值,使用volatile语义
public native int getIntVolatile(Object o, long offset);
//和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的
public native void putOrderedInt(Object o, long offset, int x);
AtomicReference:对引用进行修改,是一个模板类,抽象化了数据类型
主要接口:
get()
set(V)
compareAndSet()
getAndSet(V)
AtomicStampedReference:针对ABA问题
主要接口:
//比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
//获得当前对象引用
public V getReference()
//获得当前时间戳
public int getStamp()
//设置当前对象引用和时间戳
public void set(V newReference, int newStamp)
AtomicIntegerArray:支持无锁的数组
主要接口:
//获得数组第i个下标的元素
public final int get(int i)
//获得数组的长度
public final int length()
//将数组第i个下标设置为newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
//将第i个下标的元素加1
public final int getAndIncrement(int i)
//将第i个下标的元素减1
public final int getAndDecrement(int i)
//将第i个下标的元素增加delta(delta可以是负数)
public final int getAndAdd(int i, int delta)
AtomicIntegerFieldUpdater:让普通变量也享受原子操作。ps:1.Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果score申明为private,就是不可行的。2.为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单的申明一下就行,这不会引起什么问题。3.由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。
主要接口:
AtomicIntegerFieldUpdater.newUpdater()
incrementAndGet()
8、各种同步工具的使用
ReentrantLock:可重入锁
可重入(单线程可以重复进入,但要重复退出)、可中断(需要设置lockInterruptibly())、可限时(超时不能获得锁,就返回false,不会永久等待构成死锁)、公平锁(先来先得)
Condition:类似于 Object.wait()和Object.notify();与ReentrantLock结合使用
主要接口:
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
API详解:
1.await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线
程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
2.awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。
3.singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obej
ct.notify()方法很类似。
Semaphore:共享锁;运行多个线程同时临界区
主要接口:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()
ReadWriteLock:JDK5中提供的读写分离锁
访问情况:
读-读不互斥:读读之间不阻塞。
读-写互斥:读阻塞写,写也会阻塞读。
写-写互斥:写写阻塞。
主要接口:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
CountDownLatch:倒数计时器。一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程
,等待所有检查线程全部完工后,再执行。
主要接口:
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
CyclicBarrier:循环栅栏。Cyclic意为循环,也就是说这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程。
主要接口:
public CyclicBarrier(int parties, Runnable barrierAction)barrierAction就是当计数器一次计数完成后,系统会执行的动作
await()
LockSupport:提供线程阻塞原语。与suspend()相比不容易引起线程冻结。能够响应中断,但不抛出异常。中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志。
主要接口:
LockSupport.park();
LockSupport.unpark(t1);
9、并发容器
集合包装
// Hash map
Collections.synchronizedMap
public static Map m=Collections.synchronizedMap(new HashMap());
// List
synchronizedList
// Set
synchronizedSet
ConcurrentHashMap:高性能HashMap
BlockingQueue:阻塞队列
10、线程池基础
JDK的内置线程池
线程池种类
newFixedThreadPool:混合线程池
newSingleThreadExecutor:单例线程池
newCachedThreadPool:缓存线程池
newScheduledThreadPool:类似于事务
11、并发设计模式
在软件工程中,设计模式(design pattern)是对软件设计中普遍存在(反复出现)的各种问题,所提出的解决方案。这个术语是由埃里希·伽玛(Erich Gamma)等人在1990年代从建筑设计领域引入到计算机科学的。
单例模式:单例对象的类必须保证只有一个实例存在。许多时候整个系统只需要拥有一个的全局对象,这样有利于我们协调系统整体的行为 比如:全局信息配置。
public class Singleton {// 在类初始化时获取单例
private Singleton(){
System.out.println("Singleton is create");
}
private static Singleton instance = new Singleton();
public static Singleton getInstance() {
return instance;
}
}
public class LazySingleton {// 以懒惰模式创建单例,当调用getInstance()方法时创建单例
private LazySingleton() {
System.out.println("LazySingleton is create");
}
private static LazySingleton instance = null;
public static synchronized LazySingleton getInstance() {
if (instance == null)
instance = new LazySingleton();
return instance;
}
}
public class StaticSingleton {
private StaticSingleton(){
System.out.println("StaticSingleton is create");
}
private static class SingletonHolder {
private static StaticSingleton instance = new StaticSingleton();
}
public static StaticSingleton getInstance() {
return SingletonHolder.instance;
}
}
不变模式:一个类的内部状态创建后,在整个生命期间都不会发生变化时,就是不变类。不变模式不需要同步。
public final class Product {
//确保无子类
private final String no;
//私有属性,不会被其他对象获取
private final String name;
//final保证属性不会被2次赋值
private final double price;
public Product(String no, String name, double price) { //在创建对象时,必须指定数据
super();
//因为创建之后,无法进行修改
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
一些常用的比如String、Boolean、Byte、Character、Double、Float、Integer、Long、Short都是如此。
Future模式:核心思想是异步调用。在调用方法时只产生一个包装盒,具体耗时的实现在空闲时完成。
public interface Data {
public String getResult ();
}
public class FutureData implements Data {
protected RealData realdata = null; //FutureData是RealData的包装
protected boolean isReady = false;
public synchronized void setRealData(RealData realdata) {
if (isReady) {
return;
}
this.realdata = realdata;
isReady = true;
notifyAll(); //RealData已经被注入,通知getResult()
}
public synchronized String getResult() { //会等待RealData构造完成
while (!isReady) {
try {
wait(); //一直等待,知道RealData被注入
} catch (InterruptedException e) {
}
}
return realdata.result; //由RealData实现
}
}
public class RealData implements Data {
protected final String result;
public RealData(String para) {
//RealData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
StringBuffer sb=new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
//这里使用sleep,代替一个很慢的操作过程
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
result =sb.toString();
}
public String getResult() {
return result;
}
}
public class Client {
public Data request(final String queryStr) {
final FutureData future = new FutureData();
new Thread() {
public void run() {// RealData的构建很慢,
//所以在单独的线程中进行
RealData realdata = new RealData(queryStr);
future.setRealData(realdata);
}
}.start();
return future; // FutureData会被立即返回
}
public static void main(String[] args) {
Client client = new Client();
//这里会立即返回,因为得到的是FutureData而不是RealData
Data data = client.request("name");
System.out.println("请求完毕");
try {
//这里可以用一个sleep代替了对其他业务逻辑的处理
//在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间
Thread.sleep(2000);
} catch (InterruptedException e) {
}
//使用真实的数据
System.out.println("数据 = " + data.getResult());
}
}
生产者消费者模式:生产者-消费者模式是一个经典的多线程设计模式。它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。
12、悄悄插一嘴巴NIO和AIO
NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标准。它是在Java 1.4中被纳入到JDK中的,并具有以下特性:
– NIO是基于块(Block)的,它以块为基本单位处理数据
– 为所有的原始类型提供(Buffer)缓存支持,ByteBuffer最为常用,Buffer中有3个重要的参数:位置(position)、容量(capactiy)和上限(limit)
– 增加通道(Channel)对象,作为新的原始 I/O 抽象
– 支持锁和内存映射文件的文件访问接口
– 提供了基于Selector的异步网络I/O
// 使用NIO复制文件
public static void nioCopyFile(String resource, String destination) throws IOException {
FileInputStream fis = new FileInputStream(resource);
FileOutputStream fos = new FileOutputStream(destination);
FileChannel readChannel = fis.getChannel(); //读文件通道
FileChannel writeChannel = fos.getChannel(); //写文件通道
ByteBuffer buffer = ByteBuffer.allocate(1024); //读入数据缓存
while (true) {
buffer.clear();
int len = readChannel.read(buffer); //读入数据
if (len == -1) {
break;
//读取完毕
}
buffer.flip();// 读写过程转换
writeChannel.write(buffer);
//写入文件
}
readChannel.close();
writeChannel.close();
}
// 将文件映射到内存
RandomAccessFile raf = new RandomAccessFile("C:\\mapfile.txt", "rw");
FileChannel fc = raf.getChannel();
//将文件映射到内存中
MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, 0, raf.length());
while(mbb.hasRemaining()){
System.out.print((char)mbb.get());
}
mbb.put(0,(byte)98); //修改文件
raf.close();
网络编程NIO
// 简单案例EchoServer
public static void main(String args[]) {
ServerSocket echoServer = null;
Socket clientSocket = null;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System. out.println(e);
}
while (true) {
try {
clientSocket = echoServer.accept();
System. out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System. out.println(e);
}
}
}
static class HandleMsg implements Runnable{
// 省略部分信息
public void run(){
try {
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
// 从InputStream当中读取客户端所发送的数据
String inputLine = null;
long b=System. currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
os.println(inputLine);
}
long e=System. currentTimeMillis();
System. out.println("spend:"+(e-b)+"ms");
} catch (IOException e) {
e.printStackTrace();
}finally{
// 关闭资源
}
}
}
// EchoServer客户端
public static void main(String[] args) throws IOException {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
writer = new PrintWriter(client.getOutputStream(), true);
writer.println("Hello!");
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println("from server: " + reader.readLine());
} catch {
} finally {
//省略资源关闭
}
}
问题:
– 为每一个客户端使用一个线程,如果客户端出现延时等异常,线程可能会被占用很长时间。因为数据的
准备和读取都在这个线程中。
– 此时,如果客户端数量众多,可能会消耗大量的系统资源
解决:
– 非阻塞的NIO
– 数据准备好了在工作
总结:
– NIO会将数据准备好后,再交由应用进行处理,数据的读取过程依然在应用线程中完成
– 节省数据准备时间(因为Selector可以复用)
网络编程AIO
- 读完了再通知我
- 不会加快IO,只是在读完后进行通知
- 使用回调函数,进行业务处理
// AsynchronousSocketChannel使用举例
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName());
Future<Integer> writeResult=null;
try {
buffer.clear();
result.read(buffer).get(100, TimeUnit.SECONDS);
buffer.flip();
writeResult=result.write(buffer);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try {
server.accept(null, this);
writeResult.get();
result.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed: " + exc);
}
});