从流中产生的正确方法是什么?

从流中产生的正确方法是什么?

问题描述:

我有用于包含读和写的asyncio连接流的Connection对象:从流中产生的正确方法是什么?

class Connection(object): 

    def __init__(self, stream_in, stream_out): 
     object.__init__(self) 

     self.__in = stream_in 
     self.__out = stream_out 

    def read(self, n_bytes : int = -1): 
     return self.__in.read(n_bytes) 

    def write(self, bytes_ : bytes): 
     self.__out.write(bytes_) 
     yield from self.__out.drain() 

在服务器端,connected创建Connection对象每一个客户端连接时间,然后读出4个字节。

@asyncio.coroutine 
def new_conection(stream_in, stream_out): 
    conn = Connection(stream_in, stream_out) 
    data = yield from conn.read(4) 
    print(data) 

而在客户端,写出4个字节。

@asyncio.coroutine 
def client(loop): 
    ... 
    conn = Connection(stream_in, stream_out) 
    yield from conn.write(b'test') 

这个作品几乎符合市场预期,但我必须yield fromreadwrite通话。我试过yield from从内Connection荷兰国际集团:

def read(self, n_bytes : int = -1): 
    data = yield from self.__in.read(n_bytes) 
    return data 

但是比起获取数据,我得到这样

<generator object StreamReader.read at 0x1109983b8> 

如果我打电话readwrite来自多个地方的输出,我宁愿不每次重复yield from;宁可将它们放在Connection之内。我的最终目标是我new_conection功能砍倒在此:

@asyncio.coroutine 
def new_conection(stream_in, stream_out): 
    conn = Connection(stream_in, stream_out) 
    print(conn.read(4)) 
+0

为什么你必须屈服?如果你不从conn.read(4)产生,它在我看来就像它只是返回一个字节对象。这是你在这里寻找什么? – RageCage

+0

@RageCage:如果没有''yield'',conn.read(4)'仍然返回一个生成器:'' –

+0

对不起,我应该澄清;如果你不从conn.read()(单行版本)的第一次迭代中产生什么结果? – RageCage

因为StreamReader.read is a coroutine,您的用于调用它的唯一选项是a)在TaskFuture它包裹并运行通过一个事件循环,B)await从与async def定义协程荷兰国际集团,或c)使用yield from与它从一个协程定义为用@asyncio.coroutine装饰的函数。

由于Connection.read从事件循环中调用(通过协程new_connection),你不能重复使用事件循环运行StreamReader.read一个TaskFutureevent loops can't be started while they're already running。你要么必须stop the event loop(灾难性的,可能不可能做到正确)或create a new event loop(凌乱和挫败使用协程的目的)。这两种都不可取,因此Connection.read需要是协程或async函数。

其他两个选项(在协程async defawaityield from一个@asyncio.coroutine -decorated函数)大部分是等价的。唯一的区别是async def and await were added in Python 3.5,因此对于3.4,使用yield from@asyncio.coroutine是唯一选项(协同和asyncio在3.4之前不存在,所以其他版本是不相关的)。就个人而言,我更喜欢使用async defawait,因为与async def一起定义协程比使用装饰器更清晰和更清晰。

简言之:具有Connection.readnew_connection是协同程序(使用装饰或async关键字),并使用await(或yield from)调用其他协同程序(await conn.read(4)new_connection,和await self.__in.read(n_bytes)Connection.read)时。

+1

啊,非常好的答案Mego!这清楚地由知道他们在谈论什么的人写出。我从阅读中学到了很多东西。 +1 –

我发现StreamReader source code的块上线620实际上是功能的使用的一个很好的例子。

在我以前的答案,我忽略了一个事实,即​​不仅是协程(这是我应该早就知道考虑到这是从asyncio模块XD),但它产生于行的结果。所以它实际上是一个发电机,你需要从中产生效益。

借款从源代码这个循环中,你读的功能应该是这个样子:

def read(self, n_bytes : int = -1): 
    data = bytearray() #or whatever object you are looking for 
    while 1: 
     block = yield from self.__in.read(n_bytes) 
     if not block: 
      break 
     data += block 
    return data 

因为​​是一台发电机,你必须继续从它产生,直到它得到一个空的结果信号阅读结束。现在你的读函数应该返回数据而不是生成器。您不必从conn.read()这个版本中产生。

+0

完全按照您提供的函数使用函数,我仍然收到一个生成器对象('Connection.read')。 –

+0

你还在从conn.read调用中产生吗?尝试打印数据并在读取函数中键入(数据)以查看返回前的内容。 – RageCage

+0

不,我删除了它,而是尝试了'data = conn.read(4)'。它是一个发电机。 –