第六章:小朱笔记hadoop之源码分析-ipc分析 第三节:Server类分析
第六章:小朱笔记hadoop之源码分析-ipc分析
第三节:Server类分析
启动Listener进程,如果收到需要建立连接的请求,将建立连接,然后在上面捕获读操作的命令。收到命令之后,将把解析客户端发过来信息的工作委派给Connection。Connection把信息封装到Call对象中,放入队列中,待Handler处理。启动指定数目的Handler线程,处理客户端对指定方法调用的请求,然后把结果返回给客户端。
(1)nio的reactor模式
(a)一个线程来处理所有连接(使用一个Selector)
(b)一组线程来读取已经建立连接的数据(多个Selector,这里的线程数一般和cpu的核数相当);
(c)一个线程池(这个线程池大小可以根据业务需求进行设置)
(d)一个线程处理所有的连接的数据的写操作(一个Selector)
(2)RPC Server主要流程
RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。
接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:
(a)Listener线程监视RPC Client发送过来的数据。
(b)当有数据可以接收时,调用Connection的readAndProcess方法。
(c)Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。
(d)处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:
(e)Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。
(f)将Call交给RPC.Server处理。
(g)借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。
(h)返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出数据,则交由Server.Responder来完成。
(3)server类的结构
/**
* An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on a
* port and is defined by a parameter class and a value class.
*
* @see Client
*/
public abstract class Server {
private final boolean authorize;
private boolean isSecurityEnabled;
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
// 1 : Introduce ping and server does not throw away RPCs
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
public static final byte CURRENT_VERSION = 4;
/**
* How many calls/handler are allowed in the queue.
*/
private static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
private static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size";
/**
* Initial and max size of response buffer
*/
static int INITIAL_RESP_BUF_SIZE = 10240;
static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size";
static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024;
public static final Log LOG = LogFactory.getLog(Server.class);
private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + Server.class.getName());
private static final String AUTH_FAILED_FOR = "Auth failed for ";
private static final String AUTH_SUCCESSFULL_FOR = "Auth successfull for ";
private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();
private String bindAddress; // 服务端绑定的地址
private int port; // port we listen on 服务端监听端口
private int handlerCount; // number of handler threads处理线程的数量
private int readThreads; // number of read threads
private Class<? extends Writable> paramClass; // class of call
// parameters调用的参数的类,必须实现Writable序列化接口
private int maxIdleTime; // the maximum idle time after 当一个客户端断开连接后的最大空闲时间
// which a client may be disconnected
private int thresholdIdleConnections; // the number of idle
// connections可维护的最大连接数量
// after which we will start
// cleaning up idle
// connections
int maxConnectionsToNuke; // the max number of
// connections to nuke
// during a cleanup
protected RpcInstrumentation rpcMetrics; // 维护RPC统计数据
private Configuration conf;
private SecretManager<TokenIdentifier> secretManager;
private int maxQueueSize;// 处理器Handler实例队列大小
private final int maxRespSize;
private int socketSendBufferSize; // Socket Buffer大小
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
volatile private boolean running = true; // true while server runs
private BlockingQueue<Call> callQueue; // queued calls// 维护调用实例的队列
private List<Connection> connectionList = Collections.synchronizedList(new LinkedList<Connection>());// 维护客户端连接的列表
// maintain a list
// of client connections
private Listener listener = null;// // 监听Server Socket的线程,为处理器Handler线程创建任务
private Responder responder = null;// // 响应客户端RPC调用的线程,向客户端调用发送响应信息
private int numConnections = 0;
private Handler[] handlers = null; // // 处理器Handler线程数组
/**
* This is set to Call object before Handler invokes an RPC and reset after
* the call returns.
*/
private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
........
}
这里的Server类是个抽象类,唯一抽象的地方,就是
public abstract Writable call(Writable param, long receiveTime) throws IOException;由RPC.server来实现。
(a)Call
用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;
/**
* A call queued for handling.
*
* 该类Server端使用队列维护的调用实体类
* */
private static class Call {
private int id; // 客户端调用Call的ID
private Writable param; // 客户端调用传递的参数
private Connection connection; // 到客户端的连接实例
private long timestamp; // 向客户端调用发送响应的时间戳
private ByteBuffer response; // 向客户端调用响应的字节缓冲区
.......
}
(b)Listener
监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列。
/**
* Listens on the socket. Creates jobs for the handler threads
*
* 用来监听服务器Socket,并未Handler处理器线程创建处理任务
*
* 在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:
* */
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; // the accept channel
private Selector selector = null; // the selector that we use for the
// server
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; // the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; // the last time when a cleanup
// connec-
// -tion (for idle connections) ran
private long cleanupInterval = 10000; // the minimum interval between
// two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
private ExecutorService readPool;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);// 根据bindAddress和port创建一个Socket地址
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open(); // 创建一个Server
// Socket通道(ServerSocketChannel)
acceptChannel.configureBlocking(false); // 设置Server Socket通道为非阻塞模式
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); // Could be an
// ephemeral port //
// Socket绑定端口
// create a selector;
selector = Selector.open(); // 创建一个选择器(使用选择器,可以使得指定的通道多路复用)
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
// 启动多个reader线程,为了防止请求多时服务端响应延时的问题
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// Register accepts on the server socket with the selector.
// / // 注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server
// Socket接受的操作集合
this.setName("IPC Server listener on " + port); // 设置监听线程名称
this.setDaemon(true); // 设置为后台线程
}
(c)Responder
响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端。
// Sends responses of RPC back to clients.
// 该线程类实现发送RPC响应到客户端
// 通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况:
//
// 1、一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉;
//
// 2、另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。
private class Responder extends Thread {
private Selector writeSelector;
private int pending; // connections waiting to register
final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
long lastPurgeTime = 0; // last check for old calls.
while (running) {
try {
waitPending(); // If a channel is being registered, wait.//
// 等待一个通道中,接收到来的调用进行注册
writeSelector.select(PURGE_INTERVAL);// 设置超时时限
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据
}
} catch (IOException e) {
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = System.currentTimeMillis();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not been sent out for
// a
// long time, discard them.
//
LOG.debug("Checking for old call responses.");
ArrayList<Call> calls;
// get the list of channels from list of keys.
// 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL
// 则这些调用被视为过期调用,进行清除
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call) key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
for (Call call : calls) {
try {
doPurge(call, now);
} catch (IOException e) {
LOG.warn("Error in purging old calls " + e);
}
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
try {
Thread.sleep(60000);
} catch (Exception ie) {
}
} catch (Exception e) {
LOG.warn("Exception in Responder " + StringUtils.stringifyException(e));
}
}
LOG.info("Stopping " + this.getName());
}
// 当某个通道上可写的时候,可以执行异步写响应数据的操作,实现方法为:
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call) key.attachment();
if (call == null) {
return;
}
if (key.channel() != call.connection.channel) {
throw new IOException("doAsyncWrite: bad channel");
}
synchronized (call.connection.responseQueue) {
if (processResponse(call.connection.responseQueue, false)) {// 调用processResponse处理与调用关联的响应数据
try {
key.interestOps(0);
} catch (CancelledKeyException e) {
/*
* The Listener/reader might have closed the socket. We
* don't explicitly cancel the key, so not sure if this
* will ever fire. This warning could be removed.
*/
LOG.warn("Exception while changing ops : " + e);
}
}
}
}
//
// Remove calls that have been pending in the responseQueue
// for a long time.
//
/**
* 如果未被处理响应的调用在队列中滞留超过指定时限,要定时清除掉
*/
private void doPurge(Call call, long now) throws IOException {
LinkedList<Call> responseQueue = call.connection.responseQueue;
synchronized (responseQueue) {
Iterator<Call> iter = responseQueue.listIterator(0);
while (iter.hasNext()) {
call = iter.next();
if (now > call.timestamp + PURGE_INTERVAL) {
closeConnection(call.connection);
break;
}
}
}
}
// Processes one response. Returns true if there are no more pending
// data for this channel.
//
/**
* 处理一个通道上调用的响应数据 如果一个通道空闲,返回true
*/
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel. //
// 一个通道channel有更多的数据待读取
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
// // 从队列中取出第一个调用call
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection);
}
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中)
if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回
return true;
}
if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中
call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1
if (numElements == 1) { // 最后一个调用已经处理完成
done = true; // 该通道channel没有更多的数据
} else {
done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes
+ " bytes.");
}
} else {
//
// If we were unable to write the entire response out,
// then
// insert in Selector queue.
// // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据
// /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中
call.connection.responseQueue.addFirst(call);
if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理)
call.timestamp = System.currentTimeMillis(); // 设置调用时间戳
incPending(); // 增加未被处理响应信息的调用计数
try {
// Wakeup the thread blocked on select, only
// then can the call
// to channel.register() complete.
writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程
channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector
} catch (ClosedChannelException e) {
// Its ok. channel might be closed else where.
done = true;
} finally {
decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes
+ " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(getName() + ", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}
//
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call); // 将执行完成的调用加入队列,准备响应客户端
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);// 如果队列中只有一个调用,直接进行处理
}
}
}
private synchronized void incPending() { // call waiting to be enqueued.
pending++;
}
private synchronized void decPending() { // call done enqueueing.
pending--;
notify();
}
private synchronized void waitPending() throws InterruptedException {
while (pending > 0) {
wait();
}
}
}
(d)Connection
连接类,真正的客户端请求读取逻辑在这个类中。Connection,代表与Client端的连接,读取客户端的call并放到一个阻塞队列中,Handler负责从这个队列中读取数据并处理
/**
* Reads calls from a connection and queues them for handling.
* 该类表示服务端一个连接的抽象,主要是读取从Client发送的调用,并把读取到的调用Client.Call实例加入到待处理的队列。
*
*
* */
public class Connection {
private boolean rpcHeaderRead = false; // if initial rpc header is read/
// 是否初始化签名,并读取了版本信息
private boolean headerRead = false; // if the connection header that//
// 是否读取了头信息
// follows version is read.
private SocketChannel channel;
private ByteBuffer data;
private ByteBuffer dataLengthBuffer;
private LinkedList<Call> responseQueue;
private volatile int rpcCount = 0; // number of outstanding rpcs
private long lastContact;
private int dataLength;// 数据长度
private Socket socket;
// Cache the remote host & port info so that even if the socket is
// disconnected, we can say where it used to connect to.
private String hostAddress;
private int remotePort;
private InetAddress addr;
ConnectionHeader header = new ConnectionHeader();// 连接头信息
Class<?> protocol;// 协议类
boolean useSasl;
SaslServer saslServer;
private AuthMethod authMethod;
private boolean saslContextEstablished;
private boolean skipInitialSaslHandshake;
private ByteBuffer rpcHeaderBuffer;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before
// auth
// Fake 'call' for failed authorization response
private final int AUTHROIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33;
private final Call saslCall = new Call(SASL_CALLID, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean useWrap = false;
public Connection(SelectionKey key, SocketChannel channel, long lastContact) {
this.channel = channel;// Socket通道
this.lastContact = lastContact;// 最后连接时间
this.data = null;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.unwrappedData = null;
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket(); // 获取到与通道channel关联的Socket
this.addr = socket.getInetAddress();// 获取Socket地址
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
this.hostAddress = addr.getHostAddress();
}
this.remotePort = socket.getPort();// 获取到远程连接的端口号
this.responseQueue = new LinkedList<Call>();// 服务端待处理调用的队列
if (socketSendBufferSize != 0) {
try {
socket.setSendBufferSize(socketSendBufferSize); // 设置Socket
// Buffer大小
} catch (IOException e) {
LOG.warn("Connection: unable to set socket send buffer size to " + socketSendBufferSize);
}
}
}
.......
}
(e)Handler
请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。它从调用队列中获取调用信息,然后反射调用真正的对象,得到结果,然后再把此次调用放到响应队列(response queue)里。
/** Handles queued calls . */
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);// 作为后台线程运行
this.setName("IPC Server handler " + instanceNumber + " on " + port);
}
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);// 作为后台线程运行
ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); // 存放响应信息的缓冲区
while (running) {
try {
final Call call = callQueue.take(); // pop the queue; maybe
// blocked here//
// 出队操作,获取到一个调用Server.Call
// call
if (LOG.isDebugEnabled())
LOG.debug(getName() + ": has #" + call.id + " from " + call.connection);
String errorClass = null;
String error = null;
Writable value = null;
CurCall.set(call);// 设置当前线程本地变量拷贝的值为出队得到的一个call调用实例
try {
// Make the call as the user via Subject.doAs, thus
// associating
// the call with the Subject
// 调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
if (call.connection.user == null) {
value = call(call.connection.protocol, call.param, call.timestamp);
} else {
// //
// 根据调用Server.Call关联的连接Server.Connection,所对应的用户Subject,来执行IPC调用过程
value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
@Override
public Writable run() throws Exception {
// make the call
return call(call.connection.protocol, call.param, call.timestamp);
}
});
}
} catch (Throwable e) {
LOG.info(getName() + ", call " + call + ": error: " + e, e);
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
}
CurCall.set(null);
synchronized (call.connection.responseQueue) {
// setupResponse() needs to be sync'ed together with
// responder.doResponse() since setupResponse may use
// SASL to encrypt response data and SASL enforces
// its own message ordering.
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
// Discard the large buf and reset it back to
// smaller size to freeup heap
if (buf.size() > maxRespSize) {
LOG.warn("Large response size " + buf.size() + " for call " + call.toString());
buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
}
responder.doRespond(call);
}
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " + StringUtils.stringifyException(e));
}
} catch (Exception e) {
LOG.info(getName() + " caught: " + StringUtils.stringifyException(e));
}
}
LOG.info(getName() + ": exiting");
}
}
(4)Server一次完整请求处理流程分析
demo示例:
/**
*
* Description: RPCserver test<br>
*
* Copyright: Copyright (c) 2013 <br>
* Company: www.renren.com
*
* @author zhuhui{[email protected]} 2013-5-17
* @version 1.0
*/
public class RPCserver {
/**
* @param args
*/
public static void main(String[] args) {
Server server;
try {
server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
server.start();
try {
server.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
第一步、启动过程
server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
server.start();
(1)初始化
我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象,并且初始化了listener、 responder
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
this.conf = conf;
this.port = port;
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxQueueSize = handlerCount * conf.getInt(
IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
this.readThreads = conf.getInt(
IPC_SERVER_RPC_READ_THREADS_KEY,
IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
this.authorize =
conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
// Start the listener here and let it bind to the port
listener = new Listener();
this.port = listener.getAddress().getPort();
this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
// Create the responder here
responder = new Responder();
if (isSecurityEnabled) {
SaslRpcServer.init(conf);
}
}
(2)启动listener、 responder、 handlers
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。
第二步、Listener接收数据
(1)启动run,在 selector.select阻塞,等待初始化注册的SelectionKey.OP_ACCEPT事件
//注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server
// Socket接受的操作集合
this.setName("IPC Server listener on " + port); // 设置监听线程名称
this.setDaemon(true); // 设置为后台线程
while (running) {
SelectionKey key = null;
try {
selector.select();// 选择一组key集合,这些选择的key相关联的通道已经为I/O操作做好准备
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())// 如果该key对应的通道已经准备好接收新的Socket连接
doAccept(key); // 调用,接收与该key关联的通道上的连接
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
cleanupConnections(true);
try {
Thread.sleep(60000);
} catch (Exception ie) {
}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
cleanupConnections(false);
}
(2)唤醒 reader, 根据key关联的Server Socket通道,接收该通道上Client端到来的连接
/**
* 根据key关联的Server Socket通道,接收该通道上Client端到来的连接
*/
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取到Server
// Socket
// 通道
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
try {
reader.startAdd(); // **readSelector,设置adding为true
SelectionKey readKey = reader.registerChannel(channel); // 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件
c = new Connection(readKey, channel, System.currentTimeMillis()); // 创建连接
readKey.attach(c); // 使连接实例与注册到选择器selector相关的读操作集合键相关联
// //将connection对象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);// 加入Server端连接维护列表
numConnections++;
}
if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections
+ "; # queued calls: " + callQueue.size());
} finally {
// 设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
// 用了wait()方法等待。因篇幅有限,就不贴出源码了。
reader.finishAdd();
}
}
}
// 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件
public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
(3)读取数据readAndProcess>>processOneRpc>>processData
// 方法读取远程过程调用的数据,从一个Server.Connection的Socket通道中读取数据,并将调用任务加入到callQueue,转交给Handler线程去处理
// 上面方法是接收调用数据的核心方法,实现了如何从SocketChannel通道中读取数据。其中processHeader方法与processData方法已经在上面种详细分析了。
// 另外,作为Server.Connection是连接到客户端的,与客户端调用进行通信,所以一个连接定义了关闭的操作,关闭的时候需要关闭与客户端Socket关联的SocketChannel通道。
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
/*
* Read at most one RPC. If the header is not read completely
* yet then iterate until we read first RPC or until there is no
* data left.
*/
int count = -1;
// 从通道channel中读取字节,加入到dataLengthBuffer字节缓冲区
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer); // 如果通道已经达到了流的末尾,会返回-1的
if (count < 0 || dataLengthBuffer.remaining() > 0) // 读取不成功,直接返回读取的字节数(读取失败可能返回0或-1)
return count;
}
// // 如果版本号信息还没有读取
if (!rpcHeaderRead) {
// Every connection is expected to send the header.
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] { rpcHeaderBuffer.get(1) };
authMethod = AuthMethod.read(new DataInputStream(new ByteArrayInputStream(method)));
dataLengthBuffer.flip(); // 反转dataLengthBuffer缓冲区
// 如果读取到的版本号信息不匹配,返回-1(HEADER =
// ByteBuffer.wrap("hrpc".getBytes()),CURRENT_VERSION = 3)
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
// Warning is ok since this is not supposed to happen.
LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version
+ " expected version " + CURRENT_VERSION);
return -1;
}
dataLengthBuffer.clear();
if (authMethod == null) {
throw new IOException("Unable to read authentication method");
}
if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
AccessControlException ae = new AccessControlException("Authentication is required");
setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage());
responder.doRespond(authFailedCall);
throw ae;
}
if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
doSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
authMethod = AuthMethod.SIMPLE;
// client has already sent the initial Sasl message and
// we
// should ignore it. Both client and server should fall
// back
// to simple auth from now on.
skipInitialSaslHandshake = true;
}
if (authMethod != AuthMethod.SIMPLE) {
useSasl = true;
}
// 成功读取到了版本号信息,清空dataLengthBuffer以便重用,同时设置versionRead为true
rpcHeaderBuffer = null;
rpcHeaderRead = true;
continue;
}
if (data == null) {
dataLengthBuffer.flip();
// 读取数据长度信息,以便分配data字节缓冲区
dataLength = dataLengthBuffer.getInt();
// 如果是Client端的ping调用,不需要处理数据,清空dataLengthBuffer,返回
if (dataLength == Client.PING_CALL_ID) {
if (!useWrap) { // covers the !useSasl too
dataLengthBuffer.clear();
return 0; // ping message
}
}
if (dataLength < 0) {
LOG.warn("Unexpected data length " + dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);// 分配data数据缓冲区,准备接收调用参数数据
}
// 从通道channel中读取字节到data字节缓冲区中
count = channelRead(channel, data);
// /// 如果data已经如期读满
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();// 反转dat字节缓冲区,准备从data缓冲区读取数据
if (skipInitialSaslHandshake) {
data = null;
skipInitialSaslHandshake = false;
continue;
}
boolean isHeaderRead = headerRead;
if (useSasl) {
saslReadAndProcess(data.array());
} else {
processOneRpc(data.array());
}
data = null;
if (!isHeaderRead) {
continue;
}
}
return count;
}
}
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
if (LOG.isDebugEnabled())
LOG.debug(" got #" + id);
Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis); //这个就是client传递过来的Invocation,包含了函数名和参数
Call call = new Call(id, param, this); //封装成call
callQueue.put(call); // 将call存入callQueue
incRpcCount(); // 增加rpc请求的计数
}
第三步、Handler执行业务
(1)从callQueue获取需要处理 Call
final Call call = callQueue.take(); // pop the queue; maybe blocked here// 出队操作,获取到一个调用Server.Call call
(2)反射调用,业务处理,返回数据调用responder.doRespond(call);
public Writable call(Class<?> protocol, Writable param, long receivedTime)
throws IOException {
…...........
Invocation call = (Invocation)param;
Method method =protocol.getMethod(call.getMethodName(),call.getParameterClasses());
// 通过反射,根据调用方法名和方法参数类型得到Method实例
method.setAccessible(true);
// 设置反射的对象在使用时取消Java语言访问检查,提高效率
Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数)
….............
}
第四步、Responder返回数据
该线程类实现发送RPC响应到客户端,通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况:一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉;另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。
a: Handler在提交结果到Responder时,会再自己的线程里面执行Responder的发送buffer的逻辑,需要注意的是为保证一个buffer是连续的写出的,Handler在将buffer加入到connection.responseQueue中时,会判断responseQueue的size是不是大于等于1,如果是则表明上一个buffer没有发送完则不走发送流程,而是交给Responder来发送;在Handler走的发送逻辑里面,如果buffer发送完成则将其从connection.responseQueue中移除。如果没发送完成则此buffer仍然是connection.responseQueue[0],并在Responder的Selector上注册此Connection的ON_WRITE事件。
b: Handler提交的数据可能积累在responseQueue上,这些由Responder来发送,Responder的发送逻辑是:如果一个buffer没发送完成会到Responder的Selector上注册此Connection的ON_WRITE事件,Responder循环处理那些可写的Connection,对于一个Connection写完其responseQueue上的数据后就取消其ON_WRITE,对于长时间不可写的Connection采取关闭连接处理。
(1)初始化创建
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open(); // create a selector
pending = 0;
}
(2)发送数据processResponse
/**
* 处理一个通道上调用的响应数据 如果一个通道空闲,返回true
*/
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
boolean error = true;
boolean done = false; // there is more data for this channel. //
// 一个通道channel有更多的数据待读取
int numElements = 0;
Call call = null;
try {
synchronized (responseQueue) {
//
// If there are no items for this channel, then we are done
//
numElements = responseQueue.size();
if (numElements == 0) {
error = false;
return true; // no more data for this channel.
}
//
// Extract the first call
// // 从队列中取出第一个调用call
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection);
}
//
// Send as much data as we can in the non-blocking fashion
//
int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中)
if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回
return true;
}
if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中
call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1
if (numElements == 1) { // 最后一个调用已经处理完成
done = true; // 该通道channel没有更多的数据
} else {
done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes
+ " bytes.");
}
} else {
//
// If we were unable to write the entire response out,
// then
// insert in Selector queue.
// // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据
// /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中
call.connection.responseQueue.addFirst(call);
if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理)
call.timestamp = System.currentTimeMillis(); // 设置调用时间戳
incPending(); // 增加未被处理响应信息的调用计数
try {
// Wakeup the thread blocked on select, only
// then can the call
// to channel.register() complete.
writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程
channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector
} catch (ClosedChannelException e) {
// Its ok. channel might be closed else where.
done = true;
} finally {
decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes
+ " bytes.");
}
}
error = false; // everything went off well
}
} finally {
if (error && call != null) {
LOG.warn(getName() + ", call " + call + ": output error");
done = true; // error. no more data for this channel.
closeConnection(call.connection);
}
}
return done;
}
(3)未发送完的数据转交给Responder run()处理
调用doAsyncWrite方法写数据,如果过期则清理
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
long lastPurgeTime = 0; // last check for old calls.
while (running) {
try {
waitPending(); // If a channel is being registered, wait.//
// 等待一个通道中,接收到来的调用进行注册
writeSelector.select(PURGE_INTERVAL);// 设置超时时限
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合
SelectionKey key = iter.next();
iter.remove();
try {
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据
}
} catch (IOException e) {
LOG.info(getName() + ": doAsyncWrite threw exception " + e);
}
}
long now = System.currentTimeMillis();
if (now < lastPurgeTime + PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not been sent out for
// a
// long time, discard them.
//
LOG.debug("Checking for old call responses.");
ArrayList<Call> calls;
// get the list of channels from list of keys.
// 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL
// 则这些调用被视为过期调用,进行清除
synchronized (writeSelector.keys()) {
calls = new ArrayList<Call>(writeSelector.keys().size());
iter = writeSelector.keys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
Call call = (Call) key.attachment();
if (call != null && key.channel() == call.connection.channel) {
calls.add(call);
}
}
}
for (Call call : calls) {
try {
doPurge(call, now);
} catch (IOException e) {
LOG.warn("Error in purging old calls " + e);
}
}
} catch (OutOfMemoryError e) {
//
// we can run out of memory if we have too many threads
// log the event and sleep for a minute and give
// some thread(s) a chance to finish
//
LOG.warn("Out of Memory in server select", e);
try {
Thread.sleep(60000);
} catch (Exception ie) {
}
} catch (Exception e) {
LOG.warn("Exception in Responder " + StringUtils.stringifyException(e));
}
}
LOG.info("Stopping " + this.getName());
}
时序图:
类图: