如何保证使用python asyncio发送的tcp数据?

问题描述:

我有一个客户端连接到服务器并发送列表中的所有消息,并且发送每条消息,它将从列表中删除。如何保证使用python asyncio发送的tcp数据?

但是,当我强制关闭服务器时,客户端仍然继续发送和从列表中删除消息。我希望如果连接关闭,客户端停止发送消息,或者如果不能保证服务器已收到消息,则不要从列表中删除。

我检查当连接断开后发送超过四条消息时,显示错误“socket.send()引发异常”。但我不知道如何得到这个错误,我认为它是异步的。无论如何,如果我得到那个错误,如果列表连接关闭后发送少于五条消息,错误不会发生。

Ps:我写了服务器只是为了我的测试,但服务器我将无法访问。因此,需要尽一切努力保证数据发送的客户端。

非常感谢。

client.py
import asyncio 

class Client(asyncio.Protocol): 
    TIMEOUT = 1.0 
    event_list = [] 
    for i in range(10): 
     event_list.append('msg' + str(i)) 

    def __init__(self): 
     self.client_tcp_timeout = None 
     print(self.event_list) 

    def connection_made(self, transport): 
     print('Connected to Server.') 
     self.transport = transport 
     self.client_tcp_timeout = loop.call_later(self.TIMEOUT, self.send_from_call_later) 

    def data_received(self, data): 
     self.data = format(data.decode()) 
     print('data received: {}'.format(data.decode())) 

    def send_from_call_later(self): 
     self.msg = self.event_list[0].encode() 
     self.transport.write(self.msg) 
     print('data sent: {}'.format(self.msg)) 
     print('Removing data: {}'.format(self.event_list[0])) 
     del self.event_list[0] 
     print(self.event_list) 
     print('-----------------------------------------') 
     if len(self.event_list) > 0: 
      self.client_tcp_timeout = loop.call_later(self.TIMEOUT, self.send_from_call_later) 
     else: 
      print('All list was sent to the server.') 

    def connection_lost(self, exc): 
     print('Connection lost!!!!!!.') 

loop = asyncio.get_event_loop() 

coro = loop.create_connection(Client, 'localhost', 8000) 
client = loop.run_until_complete(coro) 

loop.run_forever() 

server.py
import asyncio 

class Server(asyncio.Protocol): 
    def connection_made(self, transport): 
     peername = transport.get_extra_info('peername') 
     print('connection from {}'.format(peername)) 
     self.transport = transport 

    def data_received(self, data): 
     print('data received: {}'.format(data.decode())) 
     #self.transport.write(data) 

loop = asyncio.get_event_loop() 
coro = loop.create_server(Server, 'localhost', 8000) 
server = loop.run_until_complete(coro) 

print('serving on {}'.format(server.sockets[0].getsockname())) 

try: 
    loop.run_forever() 
except KeyboardInterrupt: 
    print("exit") 
finally: 
    server.close() 
    loop.close() 

服务器输出(力(CTRL + C)接收MSG4之后关闭服务器):

$ python3 server.py 
serving on ('127.0.0.1', 8000) 
connection from ('127.0.0.1', 56119) 
data received: msg0 
data received: msg1 
data received: msg2 
data received: msg3 
data received: msg4 
^Cexit 

客户端输出

$ python3 client.py 
['msg0', 'msg1', 'msg2', 'msg3', 'msg4', 'msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
Connected to Server. 
data sent: b'msg0' 
Removing data: msg0 
['msg1', 'msg2', 'msg3', 'msg4', 'msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg1' 
Removing data: msg1 
['msg2', 'msg3', 'msg4', 'msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg2' 
Removing data: msg2 
['msg3', 'msg4', 'msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg3' 
Removing data: msg3 
['msg4', 'msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg4' 
Removing data: msg4 
['msg5', 'msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
Connection lost!!!!!!. 
data sent: b'msg5' 
Removing data: msg5 
['msg6', 'msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg6' 
Removing data: msg6 
['msg7', 'msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg7' 
Removing data: msg7 
['msg8', 'msg9'] 
----------------------------------------- 
data sent: b'msg8' 
Removing data: msg8 
['msg9'] 
----------------------------------------- 
socket.send() raised exception. 
data sent: b'msg9' 
Removing data: msg9 
[] 
----------------------------------------- 
All list was sent to the server. 

写入TCP套接字并不能保证数据的接收。它只将数据发送到操作系统内核,然后尽可能地将数据发送到另一端。但是,一旦数据发送到操作系统内核,写入调用就会返回成功。如果数据然后被对等OS内核接收到,它将在TCP级别上确认它们。但是,这只意味着数据由内核接收,而不是由应用程序处理。

如果您希望保证消息传递并处理可能的对等关闭,则必须在协议内部实施某种确认,并且只有在您的对等应用程序成功处理后才能得到来自对等应用程序的明确确认后才会删除数据数据。

我想,如果连接关闭,客户端停止发送消息,或者如果不能保证服务器已收到消息,则不要从列表中删除。

因此,编写这样的代码。直到服务器指示它已收到消息,才能删除该消息。你可以用你喜欢的任何方式来实现,但如果它是你想要的行为,你必须对它进行编码。