多线程与非阻塞套接字
我想在Java中使用nio实现一个TCP服务器。 它只是使用选择器的select方法来获得就绪键。然后处理这些密钥,如果它们是可接受的,可读的等等。服务器工作得很好,直到即时通讯使用单线程。但是当我尝试使用更多的线程来处理密钥时,服务器的响应会变慢并最终停止响应,例如在4-5个请求之后。 这是所有什么即时做:(伪)多线程与非阻塞套接字
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey readyKey = keyIterator.next();
if (readyKey.isAcceptable()) {
//A new connection attempt, registering socket channel with selector
} else {
Worker.add(readyKey);
}
工人是执行从信道输入/输出的线程类。 这是我的Worker类的代码:
private static List<SelectionKey> keyPool = Collections.synchronizedList(new LinkedList());
public static void add(SelectionKey key) {
synchronized (keyPool) {
keyPool.add(key);
keyPool.notifyAll();
}
}
public void run() {
while (true) {
SelectionKey myKey = null;
synchronized (keyPool) {
try {
while (keyPool.isEmpty()) {
keyPool.wait();
}
} catch (InterruptedException ex) {
}
myKey = keyPool.remove(0);
keyPool.notifyAll();
}
if (myKey != null && myKey.isValid()) {
if (myKey.isReadable()) {
//Performing reading
} else if (myKey.isWritable()) {
//performing writing
myKey.cancel();
}
}
}
我的基本思路是增加的关键keyPool从不同的线程可以一键搞定,一次一个。 我的BaseServer类本身作为一个线程运行。它在事件循环开始之前创建10个工作线程。我也尝试增加BaseServer线程的优先级,以便它有更多机会接受可接受的密钥。尽管如此,在大约8次请求后它仍然停止响应。请帮助,如果我错了。提前致谢。 :)
首先,由于java.util.concurrent中存在良好的标准Java(1.5+)包装类,如BlockingQueue,因此不应再真正使用wait()和notify()调用。
其次,建议在选择线程本身执行IO操作,而不是在工作线程中执行IO操作。工作线程应该将读/写入队列并写入选择器线程。
本页面说明它非常好,甚至还提供了一个简单的TCP/IP服务器的工作代码示例:http://rox-xmlrpc.sourceforge.net/niotut/
对不起,我还没有时间看你的具体的例子。
第三,您并未从选定密钥集中删除任何内容。每次在循环周围都必须这样做,例如调用next()后调用keyIterator.remove()。
您需要阅读NIO教程。
尝试使用xsocket库。它为我节省了很多时间阅读论坛。
教程:http://xsocket.sourceforge.net/core/tutorial/V2/TutorialCore.htm
服务器代码:
import org.xsocket.connection.*;
/**
*
* @author wsserver
*/
public class XServer {
protected static IServer server;
public static void main(String[] args) {
try {
server = new Server(9905, new XServerHandler());
server.start();
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
protected static void shutdownServer(){
try{
server.close();
}catch(Exception ex){
System.out.println(ex.getMessage());
}
}
}
服务器处理:
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.*;
import org.xsocket.*;
import org.xsocket.connection.*;
public class XServerHandler implements IConnectHandler, IDisconnectHandler, IDataHandler {
private Set<ConnectedClients> sessions = Collections.synchronizedSet(new HashSet<ConnectedClients>());
Charset charset = Charset.forName("ISO-8859-1");
CharsetEncoder encoder = charset.newEncoder();
CharsetDecoder decoder = charset.newDecoder();
ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public boolean onConnect(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, MaxReadSizeExceededException {
try {
synchronized (sessions) {
sessions.add(new ConnectedClients(inbc, inbc.getRemoteAddress()));
}
System.out.println("onConnect"+" IP:"+inbc.getRemoteAddress().getHostAddress()+" Port:"+inbc.getRemotePort());
} catch (Exception ex) {
System.out.println("onConnect: " + ex.getMessage());
}
return true;
}
@Override
public boolean onDisconnect(INonBlockingConnection inbc) throws IOException {
try {
synchronized (sessions) {
sessions.remove(inbc);
}
System.out.println("onDisconnect");
} catch (Exception ex) {
System.out.println("onDisconnect: " + ex.getMessage());
}
return true;
}
@Override
public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
inbc.read(buffer);
buffer.flip();
String request = decoder.decode(buffer).toString();
System.out.println("request:"+request);
buffer.clear();
return true;
}
}
连接的客户端:
import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
/**
*
* @author wsserver
*/
public class ConnectedClients {
private INonBlockingConnection inbc;
private InetAddress address;
//CONSTRUCTOR
public ConnectedClients(INonBlockingConnection inbc, InetAddress address) {
this.inbc = inbc;
this.address = address;
}
//GETERS AND SETTERS
public INonBlockingConnection getInbc() {
return inbc;
}
public void setInbc(INonBlockingConnection inbc) {
this.inbc = inbc;
}
public InetAddress getAddress() {
return address;
}
public void setAddress(InetAddress address) {
this.address = address;
}
}
客户端代码:
import java.net.InetAddress;
import org.xsocket.connection.INonBlockingConnection;
import org.xsocket.connection.NonBlockingConnection;
/**
*
* @author wsserver
*/
public class XClient {
protected static INonBlockingConnection inbc;
public static void main(String[] args) {
try {
inbc = new NonBlockingConnection(InetAddress.getByName("localhost"), 9905, new XClientHandler());
while(true){
}
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
}
客户端处理程序:
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import org.xsocket.MaxReadSizeExceededException;
import org.xsocket.connection.IConnectExceptionHandler;
import org.xsocket.connection.IConnectHandler;
import org.xsocket.connection.IDataHandler;
import org.xsocket.connection.IDisconnectHandler;
import org.xsocket.connection.INonBlockingConnection;
/**
*
* @author wsserver
*/
public class XClientHandler implements IConnectHandler, IDataHandler,IDisconnectHandler, IConnectExceptionHandler {
Charset charset = Charset.forName("ISO-8859-1");
CharsetEncoder encoder = charset.newEncoder();
CharsetDecoder decoder = charset.newDecoder();
ByteBuffer buffer = ByteBuffer.allocate(1024);
@Override
public boolean onConnect(INonBlockingConnection nbc) throws IOException {
System.out.println("Connected to server");
nbc.write("hello server\r\n");
return true;
}
@Override
public boolean onConnectException(INonBlockingConnection nbc, IOException ioe) throws IOException {
System.out.println("On connect exception:"+ioe.getMessage());
return true;
}
@Override
public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {
System.out.println("Dissconected from server");
return true;
}
@Override
public boolean onData(INonBlockingConnection inbc) throws IOException, BufferUnderflowException, ClosedChannelException, MaxReadSizeExceededException {
inbc.read(buffer);
buffer.flip();
String request = decoder.decode(buffer).toString();
System.out.println(request);
buffer.clear();
return true;
}
}
我花了很多时间阅读关于这个论坛,我希望我能帮助ü我的代码。
在这里寻找一些有关生产者/消费者问题和一些可能有助于解决您的问题的良好数据结构的想法 - http://*.com/questions/1212386/concurrent-and-blocking-queue-in-java。 – Perception