Python多线程实现tcp应答客户端和服务端
背景
近两日一遍改毕设论文,一遍学习python。从多任务开始,记录学习过程。
此处实现一个tcp的应答程序,一个读线程一个写线程,python负责服务端,java负责客户端。任一端输入小写over,传输结束(另一端需要按下回车即可退出)。
服务端
服务端套接字的创建和监听
python服务端套接字的创建和监听与C相似,流程都是创建->绑定-.>监听。具体代码如下
tcpServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpServerSocket.bind(("", 12345))
tcpServerSocket.listen(128)
socket()方法第二个入参就表示tcp
bind()方法传入一个元组,前者是ip,这里为空,就是监听本机;后者是监听的端口号
listen()方法的入参表示一次性可以连接多少个客户端
读写线程
读写线程差不多,为了方便控制,打算用一个类继承threading.Thread来实现线程
以写线程为例,首先要绑定客户端套接字和读线程
def setDestination(self, clientSocket, recvDataThread):
self.clientSocket = clientSocket
self.recvDataThread = recvDataThread
然后在run()方法里进行数据发送。为了控制线程的终止和运行,引入running字段进行控制。输入的内容是over时,running为false,同时调用terminate()方法控制读线程的退出
def run(self):
self.running = True
while (self.running):
dataToSend = input("发给客户端:")
if dataToSend != "\n" and dataToSend != "":
self.clientSocket.send((dataToSend).encode("utf-8"))
if dataToSend == "over":
time.sleep(1)
self.running = False
self.clientSocket.close()
self.recvDataThread.terminate()
延时1s是为了让服务端最后一条信息能够发出去,而terminate()方法就是把running标志位置为false,读写线程都有此方法
def terminate(self):
self.running = False
整个写线程类的代码如下
class SendDataThread(threading.Thread):
def setDestination(self, clientSocket, recvDataThread):
self.clientSocket = clientSocket
self.recvDataThread = recvDataThread
def terminate(self):
self.running = False
def run(self):
self.running = True
while (self.running):
dataToSend = input("发给客户端:")
if dataToSend != "\n" and dataToSend != "":
self.clientSocket.send((dataToSend).encode("utf-8"))
if dataToSend == "over":
time.sleep(1)
self.running = False
self.clientSocket.close()
self.recvDataThread.terminate()
读线程和写线程类似,只不过在接收数据时,有可能服务端发送了over后套接字已经关闭,从而recv方法会报出异常,此时只要try-except即可
class RecvDataThread(threading.Thread):
def setSource(self, clientSocket, sendDataThread):
self.clientSocket = clientSocket
self.senDataThread = sendDataThread
def terminate(self):
self.running = False
def run(self):
self.running = True
while (self.running):
try:
dataReceived = str(self.clientSocket.recv(1024), "utf-8")
if dataReceived != "":
print("\n客户端来信:%s" % str(dataReceived))
if dataReceived == "over":
self.running = False
self.clientSocket.close()
self.senDataThread.terminate()
print("通信结束,摁任意键关闭")
except:
pass
接收客户端套接字,启动读写线程
这个其实很简单,accept到了客户端信息,就可以启动读写线程了
print("等待客户端连接")
tcpClientSocket, clientIp = tcpServerSocket.accept()
print("新的客户端已连接:%s" % str(clientIp))
sendDataThread = SendDataThread()
recvDataThread = RecvDataThread()
sendDataThread.setDestination(tcpClientSocket, recvDataThread)
recvDataThread.setSource(tcpClientSocket, sendDataThread)
sendDataThread.start()
recvDataThread.start()
sendDataThread.join()
recvDataThread.join()
两个子线程join是为了不让主线程提前结束
完整的服务端代码如下
import socket
import threading
import time
class SendDataThread(threading.Thread):
def setDestination(self, clientSocket, recvDataThread):
self.clientSocket = clientSocket
self.recvDataThread = recvDataThread
def terminate(self):
self.running = False
def run(self):
self.running = True
while (self.running):
dataToSend = input("发给客户端:")
if dataToSend != "\n" and dataToSend != "":
self.clientSocket.send((dataToSend).encode("utf-8"))
if dataToSend == "over":
time.sleep(1)
self.running = False
self.clientSocket.close()
self.recvDataThread.terminate()
class RecvDataThread(threading.Thread):
def setSource(self, clientSocket, sendDataThread):
self.clientSocket = clientSocket
self.senDataThread = sendDataThread
def terminate(self):
self.running = False
def run(self):
self.running = True
while (self.running):
try:
dataReceived = str(self.clientSocket.recv(1024), "utf-8")
if dataReceived != "":
print("\n客户端来信:%s" % str(dataReceived))
if dataReceived == "over":
self.running = False
self.clientSocket.close()
self.senDataThread.terminate()
print("通信结束,摁任意键关闭")
except:
pass
def main():
tcpServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpServerSocket.bind(("", 12345))
tcpServerSocket.listen(128)
tcpClientSocket = None
serverOnUse = False
print("等待客户端连接")
tcpClientSocket, clientIp = tcpServerSocket.accept()
print("新的客户端已连接:%s" % str(clientIp))
sendDataThread = SendDataThread()
recvDataThread = RecvDataThread()
sendDataThread.setDestination(tcpClientSocket, recvDataThread)
recvDataThread.setSource(tcpClientSocket, sendDataThread)
sendDataThread.start()
recvDataThread.start()
sendDataThread.join()
recvDataThread.join()
tcpServerSocket.close()
main()
客户端
客户端用java实现,除了一开始是直接连接服务端套接字外,其余和服务端类似,主要也是读写线程负责接收和发送数据,并且两个线程可以相互控制对方的结束。因此直接贴代码即可,不做过多解释
package practice;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.*;
public class dm10 {
public static void main(String[] args) {
final Scanner input = new Scanner(System.in);
try {
Socket server = new Socket("localhost", 12345);
System.out.println("已连接服务端:" + server.getInetAddress().getHostAddress());
SendThread sendThread = new SendThread(server);
RecvThread recvThread = new RecvThread(server);
sendThread.setRecvThread(recvThread);
recvThread.setSendThread(sendThread);
final Thread tSendData = new Thread(sendThread);
final Thread tRecvData = new Thread(recvThread);
tSendData.start();
tRecvData.start();
tRecvData.join();
tSendData.join();
System.out.println("over..");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class SendThread implements Runnable{
private final Scanner input = new Scanner(System.in);
private Socket serverSocket;
private BufferedOutputStream outputStream;
private RecvThread recvThread;
private boolean running = false;
public SendThread(Socket serverSocket) throws IOException {
this.serverSocket = serverSocket;
outputStream = new BufferedOutputStream(serverSocket.getOutputStream());
}
public void setRecvThread(RecvThread recvThread) {
this.recvThread = recvThread;
}
public void terminate() {
running = false;
}
@Override
public void run() {
try {
running = true;
while(running) {
String info = input.nextLine();
if (running) {
outputStream.write(info.getBytes("utf-8"));
outputStream.flush();
}
running = !info.contains("over") && !serverSocket.isClosed();
}
recvThread.terminate();
outputStream.close();
} catch (IOException e) {
try {
running = false;
recvThread.terminate();
outputStream.close();
} catch (IOException e1) {
}
}
}
}
class RecvThread implements Runnable{
private Socket serverSocket;
private BufferedInputStream inputStream;
private SendThread sendThread;
private boolean running = false;
public RecvThread(Socket serverSocket) throws IOException {
this.serverSocket = serverSocket;
inputStream = new BufferedInputStream(serverSocket.getInputStream());
}
public void setSendThread(SendThread sendThread) {
this.sendThread = sendThread;
}
public void terminate() {
running = false;
}
@Override
public void run() {
try {
running = true;
while(running) {
byte[] bytes = new byte[1024];
int len;
StringBuffer stringBuffer = new StringBuffer();
while (inputStream.available() > 0 && (len = inputStream.read(bytes)) != -1) {
stringBuffer.append(new String(bytes, 0, len, "utf-8"));
}
String fromServer = stringBuffer.toString();
if (!fromServer.isEmpty()) {
System.out.println("服务端来信:" + fromServer);
}
running = !fromServer.contains("over") && !serverSocket.isClosed();
if (fromServer.contains("over")) {
System.out.println("通信结束,按任意键关闭");
}
}
inputStream.close();
sendThread.terminate();
} catch (Exception e) {
try {
running = false;
inputStream.close();
sendThread.terminate();
} catch (IOException e1) {
e1.printStackTrace();
}
e.printStackTrace();
}
}
}
测试
主要是测试能否安全退出。
先测试服务端通知客户端结束
再测试客户端通知服务端结束
双方都既可以连续接收消息,也能连续发送消息,还能随时关闭,功能实现
结语
tcp或udp通信,实际流程是固定的,因此即便不同语言语法有差异,依旧可以照猫画虎地写出来。