python交互子程序通信
问题描述:
我在学习如何编写交互子程序通信。python交互子程序通信
我需要阅读从标准输出和标准输入不断地写,下面是我的代码,它像是“作品”,但我不知道如果我这样做写(这是非常砍死代码)
假设我有一个名为app.py如下
import logging
import random
def app():
number1 = random.randint(1,100)
number2 = random.randint(200,500)
logging.info("number1: %s, number2: %s", number1, number2)
ans = input("enter sum of {} and {}: ".format(number1, number2))
logging.info("received answer: %s", ans)
try:
if int(ans) != number1+number2:
raise ValueError
logging.info("{} is the correct answer".format(ans))
except (ValueError,TypeError):
logging.info("{} is incorrect answer".format(ans))
def main():
logging.basicConfig(level=logging.DEBUG, filename='log.log')
for x in range(10):
app()
if __name__ == '__main__':
main()
互动与上面的脚本(app.py)脚本我有一些非常丑陋的代码
import queue
import time
import threading
import subprocess
import os
import pty
import re
class ReadStdout(object):
def __init__(self):
self.queue = queue.Queue()
self._buffer_ = []
def timer(self, timeout=0.1):
buffer_size = 0
while True:
if len(self._buffer_) > buffer_size:
buffer_size = len(self._buffer_)
time.sleep(timeout)
if len(self._buffer_) == buffer_size and buffer_size!=0:
self.queue.put(''.join(self._buffer_))
self._buffer_ = []
buffer_size = 0
def read(self, fd):
while True:
self._buffer_.append(fd.read(1))
def run(self):
timer = threading.Thread(target=self.timer)
timer.start()
master, slave = pty.openpty()
p = subprocess.Popen(['python', 'app.py'], stdout=slave, stderr=slave, stdin=subprocess.PIPE, close_fds=True)
stdout = os.fdopen(master)
read_thread = threading.Thread(target=self.read, args=(stdout,))
read_thread.start()
while True:
if self.queue.empty():
time.sleep(0.1)
continue
msg = self.queue.get()
digits = (re.findall('(\d+)', msg))
ans = (int(digits[0])+int(digits[1]))
print("got message: {} result: {}".format(msg, ans))
p.stdin.write(b"%d\n" %ans)
p.stdin.flush()
if __name__ == '__main__':
x = ReadStdout()
x.run()
我不觉得我这样做的正确方法。什么是正确的方式互动与另一个脚本(我需要标准输出,以标准输入不只是盲目写)
感谢
答
该代码将与您app.py工作,所以你可以从它那里得到互动的基础逻辑。另外,我建议你看看pexpect模块。在任何情况下 - 你知道什么期望从运行程序,以及它的输入线是如何结束的。或者你可以在读取行的时候实现一些超时,所以如果事情不符合预期,可能会引发异常。
import subprocess
from functools import partial
child = subprocess.Popen(['app.py'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
# iterate while child is not terminated
while child.poll() is None:
line = ''
# read stdout character by character until a colon appears
for c in iter(partial(child.stdout.read, 1), ''):
if c == ':':
break
line += c
if "enter sum" in line:
numbers = filter(str.isdigit, line.split())
numbers = list(map(int, numbers))
child.stdin.write("{0}\n".format(sum(numbers)))
+0
因此,如果我不知道该期待什么,唯一的方法是有一个超时。没有更好的内置方式来使用它。 谢谢你的帮助。 –
[在一个脚本使用Python的子过程和POPEN要运行的需要用户交互(通过生\ _Input)另一个Python脚本](的可能的复制https://*.com/questions/12980148/using-pythons -subprocess-and-popen-in-one-script-to-run-another-python-script -w) –
这是不同的。我知道如何将stdin发送到进程。 但我需要先读取stdout,然后根据结果写入标准输入。上面的线程不用担心stdout,它只是在stdin中盲目提供。 –
我也可以读取标准输出,但我不确定是否正确读取它。因为readline()将不起作用,因为input()示例不会发送换行符。我可以阅读(大小),但我不知道这是我需要阅读的“大小”。 假设我真的不知道“input()”字符串大小的大小。 –