自定义RPC框架-JAVA实现
接口HelloService
package remote.procedure.call.server;
public interface HelloService {
public String sayHi(String name);//hi zs
}
===========================================================================================
接口实现来HelloService
package remote.procedure.call.server;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHi(String name) {
return "hi,"+name+"!";
}
}
===========================================================================================
服务中心接口
package remote.procedure.call.server;
//服务中心
public interface Server {
public void start();
public void stop();
//注册服务
public void register(Class service,Class serviceImpl);
//...
}
===========================================================================================
服务中心实现类
package remote.procedure.call.server;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerCenter implements Server {
//map:服务端的所有可供客户端访问的接口,都注册到该map中
//key:接口的名字"HelloService" value:真正的HelloService实现
private static HashMap<String,Class> serviceRegiser = new HashMap<>();
private static int port ;//端口号 =9999
//连接池:连接池中存在多个连接对象,每个连接对象都可以处理一个客户请求
private static ExecutorService executor= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static boolean isRunning = false;
public ServerCenter(int port) {
this.port=port;
}
//开启服务端服务
@Override
public void start() {//while(true){start();}
//start ->线程对象
ServerSocket server=null;
try {
server = new ServerSocket();
server.bind(new InetSocketAddress(port));
} catch (IOException e1) {
e1.printStackTrace();
}
isRunning = true;//服务已经启动
while(true) {//进行多线程并发执行
//具体的服务内容,接收客户端请求,处理请求,并返回结果
//如果想让多个客户端请求并发执行 -> 多线程
System.out.println("start server ...");
//客户端每次请求一次连接(发出一次请求),则服务端从连接池中获取一个线程对象去处理
Socket socket=null;
try {
socket= server.accept();//等待客户端连接
} catch (IOException e) {
e.printStackTrace();
}
executor.execute(new ServiceTask(socket));//启动线程去处理客户请求
}
}
@Override
public void stop() {//关闭服务
isRunning = false;
executor.shutdown();
}
@Override
public void register(Class service,Class serviceImpl) {
serviceRegiser.put(service.getName(), serviceImpl);
}
//socket客户端 --socket服务端(start()、ServeTask)
private static class ServiceTask implements Runnable{
private Socket socket;
public ServiceTask() {
}
public ServiceTask(Socket socket) {
this.socket=socket;
}
@Override
public void run() {//线程所做的事情
ObjectInputStream input=null;
ObjectOutputStream output = null;
try {
//接收到客户端连接及请求,处理该请求...
input = new ObjectInputStream(socket.getInputStream());
//因为ObjectInputStream对发送数据的顺序严格要求,因此需要参照发送的顺序逐个接收
String serviceName = input.readUTF();
String methodName = input.readUTF();
Class[] parameterTypes = (Class[]) input.readObject();//方法的参数类型
Object[] arguments = (Object[]) input.readObject();//方法的参数名
//根据客户要求,到map中找到与之对应的具体接口
Class serviceClass = serviceRegiser.get(serviceName);//HelloService
Method method = serviceClass.getMethod(methodName, parameterTypes);
//执行该方法
Object result = method.invoke(serviceClass.newInstance(),arguments);//接口的对象,参数列表
//将发放执行完毕的返回值传递给客户端
output = new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(output!=null)output.close();
if(input!=null)input.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
===========================================================================================
客户端实现类
package remote.procedure.call.client;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class Client {
//获取代表服务端接口的动态代理对象(HelloService)
//serviceName:请求的端口名
//addr:待请求服务端的ip:端口
@SuppressWarnings("unchecked")
public static <T> T getRemoteProxyObj(Class serviceInterface,InetSocketAddress addr) {//动态代理对象
/*
* newProxyInstance(a,b,c)
* a:类加载器:需要代理哪个类(例如HelloService接口),就需要将HelloService的类加载器传入第一个参数
* b:需要代理的对象,具备哪些功能 --接口
*/
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),new Class<?>[] {serviceInterface}, new InvocationHandler() {
//proxy:代理的对象 method:哪个方法 args:参数列表
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//客户端向服务端发送请求:请求某一个具体的接口
Socket socket = new Socket();
ObjectOutputStream output=null;
ObjectInputStream input=null;
try {
//socketaddress: Ip:端口
socket.connect(addr);
output =new ObjectOutputStream(socket.getOutputStream());//发送
//接口名、方法名:writeUTF
output.writeUTF(serviceInterface.getName());
output.writeUTF(method.getName());
//方法参数的类型、方法参数 Object
output.writeObject(method.getParameterTypes());
output.writeObject(args);
//等待服务端处理...
//接收服务端处理后的返回值
input=new ObjectInputStream(socket.getInputStream());
return input.readObject();//客户端-服务端 --> 返回值
} catch (Exception e) {
e.printStackTrace();
return null;
}finally {
try {
if(output!=null)output.close();
if(input!=null)input.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
}
===========================================================================================
服务端测试
package remote.procedure.call.test;
import remote.procedure.call.server.HelloService;
import remote.procedure.call.server.HelloServiceImpl;
import remote.procedure.call.server.Server;
import remote.procedure.call.server.ServerCenter;
public class RPCServerTest {
public static void main(String[] args) {
//开启一个线程
new Thread(new Runnable() {
@Override
public void run() {
//服务中心
Server server = new ServerCenter(9999);
//将HelloService接口以及实现类注册到服务中心
server.register(HelloService.class, HelloServiceImpl.class);
server.start();
}}).start();//start()
}
}
===========================================================================================
客户端测试
package remote.procedure.call.test;
import java.net.InetSocketAddress;
import remote.procedure.call.client.Client;
import remote.procedure.call.server.HelloService;
public class RPCClientTest {
public static void main(String[] args) throws ClassNotFoundException {
HelloService sevice = Client.getRemoteProxyObj(Class.forName("remote.procedure.call.server.HelloService"), new InetSocketAddress("127.0.0.1",9999));
System.out.println(sevice.sayHi("zs"));
}
}