Java实现简易RPC框架(一)
早些天看Hadoop源码的时候了解到Hadoop分布式环境中各个组件间的通信采用的RPC,由于暂无时间深入分析Hadoop中的RPC实现方式。参考网上资料学习跟例子实现简易RPC框架。
一、什么是RPC
RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。JDK内部提供RMI实现RPC(仅限于JAVA对象的远程调用)。
二、利用RMI实现RPC
想要了解RMI详细知识请查看本人另一编blog《如何发布RMI服务》。
1、首先定义RMI服务器接口
package org.bird.rmi;
import java.io.Serializable;
import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface RMIServer extends Remote,Serializable {
static String DEFAULT_HOST = "127.0.0.1";
static int DEFAULT_PORT = 8888;
/**
* 注册本地对象
* @param interfaceDefiner
* @param impl
* @throws InstantiationException
* @throws IllegalAccessException
*/
public void registerLocalObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException;
/**
* 注册远程对象
* @param interfaceDefiner
* @param impl
* @throws RemoteException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl) throws RemoteException, InstantiationException, IllegalAccessException;
/**
* 卸载指定对象
* @param interfaceDefiner
* @throws RemoteException
* @throws NotBoundException
* @throws ClassNotFoundException
*/
public void unregisterObject(Class<?> interfaceDefiner) throws RemoteException, NotBoundException, ClassNotFoundException;
/**
* 卸载所有对象
* @throws AccessException
* @throws RemoteException
* @throws NotBoundException
* @throws ClassNotFoundException
*/
public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException;
/**
* 获取对象
* @param interfaceDefiner
* @return
*/
public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException;
/**
* 启动服务
* @throws RemoteException
*/
public void start() throws RemoteException;
/**
* 关闭服务器
* @param remote 是否同时关闭远程RMI服务
* @throws AccessException
* @throws RemoteException
* @throws NotBoundException
* @throws ClassNotFoundException
*/
public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException;
/**
* 关闭远程服务
* @throws RemoteException
*/
public void remoteStop() throws RemoteException;
/**
* 判断服务器运行状态
* @return
* @throws RemoteException
*/
public boolean isRunning() throws RemoteException;
}
2、接着实现RMI服务器相关功能的实现类
package org.bird.rmi;
import java.rmi.AccessException;
import java.rmi.ConnectException;
import java.rmi.NotBoundException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.bird.rmi.support.Listener;
public class RMIServerImpl extends UnicastRemoteObject implements RMIServer {
/**
*
*/
private static final long serialVersionUID = 1L;
private boolean isRuning;
private Listener listener;
/** 本地主机RMI注册表 */
private Registry localRegistry = null;
/** 远程主机RMI注册表 */
private Registry remoteRegistry = null;
/** 远程服务器 */
private RMIServer remoteServer = null ;
/** 缓存本地对象实例 */
private Map<String,Object> registeredLocalObjectMap = new HashMap<String,Object>();
/** 缓存远程对象实例 */
private Map<String,Remote> registeredRemoteObjectMap = new HashMap<String,Remote>();
public RMIServerImpl() throws RemoteException {
this(DEFAULT_HOST, DEFAULT_PORT);
}
public RMIServerImpl(int port) throws RemoteException {
this(DEFAULT_HOST, port);
}
public RMIServerImpl(String cip, int cport) throws RemoteException {
registerLocalServer(cip, cport);
}
public RMIServerImpl(String cip,int cport, String sip, int sport) throws RemoteException {
getRemoteServer(sip, sport);
registerLocalServer(cip, cport);
}
private void registerLocalServer(String cip, int cport) throws AccessException, RemoteException {
System.setProperty("java.rmi.server.hostname", cip);
if(localRegistry == null) {
try {
localRegistry = LocateRegistry.getRegistry(cip, cport);
localRegistry.list();
} catch (Exception e) {
try {
localRegistry = LocateRegistry.createRegistry(cport);//在本地主机创建并导出RMI注册表
} catch (RemoteException e1) {
throw new RuntimeException(e1);
}
}
}
localRegistry.rebind(getKey(cip, cport), this);
}
private void getRemoteServer(String sip, int sport) throws AccessException, RemoteException {
if(null == sip || "".equals(sip.trim())){
return;
}
remoteRegistry = LocateRegistry.getRegistry(sip, sport);
try {
remoteServer = (RMIServer) remoteRegistry.lookup(getKey(sip, sport));
} catch (ConnectException e) {
remoteRegistry = null;
throw new RemoteException("无法获取远程服务:" + getKey(sip, sport));
} catch (NotBoundException e) {
remoteRegistry = null;
throw new RemoteException("无法获取远程服务:" + getKey(sip, sport));
}
}
public void registerLocalObject(Class<?> interfaceDefiner,
Class<?> impl) throws InstantiationException, IllegalAccessException {
Object instance = impl.newInstance();
registeredLocalObjectMap.put(interfaceDefiner.getName(), instance);
System.out.println("注册本地对象:" + interfaceDefiner.getName());
}
public void registerRemoteObject(Class<?> interfaceDefiner, Class<?> impl)
throws RemoteException, InstantiationException, IllegalAccessException {
Object object = impl.newInstance();//反射实例化
if(!(object instanceof Remote)){
throw new ClassCastException("cannot be cast to java.rmi.Remote");
}
System.out.println("注册远程对象:" + interfaceDefiner.getName());
Remote instance = (Remote) object;
registeredRemoteObjectMap.put(interfaceDefiner.getName(), instance);
if (instance instanceof UnicastRemoteObject) {
localRegistry.rebind(interfaceDefiner.getName(), instance);//更新对此注册表中指定name的远程引用。
if(remoteServer != null) {
remoteServer.registerLocalObject(interfaceDefiner, impl);//对象实例直接推送到远程服务器端
}
}else{
Remote stub = UnicastRemoteObject.exportObject(instance, 0);
localRegistry.rebind(interfaceDefiner.getName(), stub);//更新对此注册表中指定name的远程引用。
if(remoteServer != null) {
remoteServer.registerLocalObject(interfaceDefiner, stub.getClass());//对象实例直接推送到远程服务器端
}
}
}
public void unregisterObject(Class<?> interfaceDefiner) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
unregisterObject(interfaceDefiner.getName());
}
private void unregisterObject(String key) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
if(registeredLocalObjectMap.containsKey(key)) {
registeredLocalObjectMap.remove(key);
System.out.println("注销本地对象:" + key);
}else if (registeredRemoteObjectMap.containsKey(key)) {
localRegistry.unbind(key);//移除RMI注册表中的绑定
Remote remote = registeredRemoteObjectMap.get(key);
if(remote != null) {
UnicastRemoteObject.unexportObject(remote, true);//从RMI中移除远程对象
remote = null;
}
registeredRemoteObjectMap.remove(key);
if(remoteServer != null) {
remoteServer.unregisterObject(Class.forName(key));//将推送到远程服务器上对象实例注销
}
System.out.println("注销远程对象:" + key);
}
}
public void unregisterAllObjects() throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
registeredLocalObjectMap.clear();//注销本地对象
List<String> keys = new ArrayList<String>();
for(String key : registeredRemoteObjectMap.keySet()) {
keys.add(key);
}
for(String key : keys) {
unregisterObject(key);
}
}
public <T> T getObject(Class<T> interfaceDefiner) throws RemoteException {
String key = interfaceDefiner.getName();
if(registeredLocalObjectMap.containsKey(key)) {
System.out.println("调用本地对象:" + interfaceDefiner.getName());
return (T) registeredLocalObjectMap.get(key);
}
if(registeredRemoteObjectMap.containsKey(key)) {
System.out.println("调用远程对象:" + interfaceDefiner.getName());
return (T) registeredRemoteObjectMap.get(key);
}
if(remoteServer != null) {
return remoteServer.getObject(interfaceDefiner);
}
return null;
}
public void start() throws RemoteException {
listener = new Listener(this);
this.isRuning = true;
listener.start();
}
public void stop(boolean remote) throws AccessException, RemoteException, NotBoundException, ClassNotFoundException {
unregisterAllObjects();
String[] names = localRegistry.list();
for(String name : names) {
localRegistry.unbind(name);
}
UnicastRemoteObject.unexportObject(localRegistry, true);
localRegistry = null;
if(remote && remoteServer != null){
remoteServer.remoteStop();
}
System.out.println("本地RMI服务器停止...");
System.exit(0);
}
public void remoteStop() throws RemoteException {
this.isRuning = false;
}
private String getKey(String name, int port) {
return name + "|" + port;
}
public boolean isRunning() {
return isRuning;
}
}
3、然后再编写一个监听器用于监听服务器
package org.bird.rmi.support;
import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import org.bird.rmi.RMIServer;
/**
* RMI服务监听器
* @author liangjf
*
*/
public class Listener extends Thread {
private RMIServer server;
public Listener(RMIServer server) {
this.server = server;
}
public void run() {
System.out.println("RMI服务器启动...");
try {
while(server.isRunning()) {
}
} catch (RemoteException e) {
e.printStackTrace();
}
try {
server.stop(false);
} catch (AccessException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemoteException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (NotBoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
4、到目前一个简单的RPC框架就完成了。接着我们编写两个测试用例
package org.bird.rmi.test.echo;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface Echo extends Remote {
public String sayHello(String name) throws RemoteException;
}
package org.bird.rmi.test.echo;
import java.rmi.Remote;
import java.rmi.RemoteException;
public interface Hello extends Remote {
public String sayHello(String name) throws RemoteException;
}
package org.bird.rmi.test.echo;
import java.rmi.RemoteException;
public class EchoImpl implements Echo {
public EchoImpl() throws RemoteException {
super();
}
public String sayHello(String name) {
return "Hello, " + name;
}
}
package org.bird.rmi.test.echo;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
public class RemoteHello extends UnicastRemoteObject implements Hello {
public RemoteHello() throws RemoteException {
super();
// TODO Auto-generated constructor stub
}
/**
*
*/
private static final long serialVersionUID = 1L;
public String sayHello(String name) throws RemoteException {
return "Hello, " + name;
}
}
再编写一个服务器测试入口类
package org.bird.rmi.test;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Echo;
import org.bird.rmi.test.echo.EchoImpl;
import org.bird.rmi.test.echo.Hello;
import org.bird.rmi.test.echo.RemoteHello;
public class MainServer {
/**
* @param args
* @throws RemoteException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws NotBoundException
*/
public static void main(String[] args) throws RemoteException, InstantiationException, IllegalAccessException, NotBoundException {
RMIServer server = new RMIServerImpl(8888);
server.registerRemoteObject(Hello.class, RemoteHello.class);
server.registerRemoteObject(Echo.class, EchoImpl.class);
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("服务端JVM关闭...");
}
});
server.start();
}
}
最后写两个客户端的测试入口类
package org.bird.rmi.test;
import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Hello;
public class MainClient1 {
/**
* @param args
* @throws NotBoundException
* @throws RemoteException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws InterruptedException
* @throws AccessException
*/
public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9998, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT);
Hello hello = client.getObject(Hello.class);
System.out.println(hello.sayHello("ljf"));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("客户端1关闭.....");
}
});
client.stop(false);
}
}
package org.bird.rmi.test;
import java.rmi.AccessException;
import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import org.bird.rmi.RMIServer;
import org.bird.rmi.RMIServerImpl;
import org.bird.rmi.test.echo.Echo;
public class MainClient2 {
/**
* @param args
* @throws NotBoundException
* @throws RemoteException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
* @throws InterruptedException
* @throws AccessException
*/
public static void main(String[] args) throws RemoteException, NotBoundException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException {
RMIServer client = new RMIServerImpl(RMIServer.DEFAULT_HOST, 9999, RMIServer.DEFAULT_HOST, RMIServer.DEFAULT_PORT);
Echo echo = client.getObject(Echo.class);
System.out.println(echo.sayHello("飞飞"));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("客户端2关闭.....");
}
});
client.stop(true);
}
}
5、测试用例编写完成后将RMIServer、RMIServerImpl、Listener、Echo、Hello、EchoImp、RemoteHello、MainServer这些类打包成可运行的server.jar;RMIServer、RMIServerImpl、Listener、Hello、MainClient1这些类打包成可以运行的client1.jar;RMIServer、RMIServerImpl、Listener、Echo、MainClient2这些类打包成可以运行的client2.jar
6、首先在命令行下运行server.jar
7、再另打开一窗口运行client1.jar查看一下运行情况
8、另外再打开一个窗口运行client2.jar查看客户端与服务端的变化情况
总结:上面实现的RMI框架服务端与客户端可以相互调用(既是服务端也是客户端),可以注册本地对象或者注册远程对象,对象的调用就跟本地调用一样对上层使用者来说是完全透明的。