Python 程序:基于RabbitMQ实现主机管理
Python 程序:基于RabbitMQ实现主机管理
1、需求
2、代码
3、测试样图
一、需求
1、可以异步的执行多个命令
2、对多台机器
举例
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334
>>: check_task 45334
>>:
二、代码
1 import random 2 import pika 3 4 class rpc_cilent(object): 5 def connect(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 7 def connect_1(self): 8 self.credentials = pika.PlainCredentials('zz', '123456') 9 self.connection = pika.BlockingConnection(pika.ConnectionParameters( 10 '192.168.43.165',5672,'/',self.credentials)) 11 def on_response(self, ch, method, props, body): 12 if self.corr_id == props.correlation_id: 13 self.response = body 14 def call(self, command, host,tmp_dict): 15 if host == "127.0.0.1": 16 self.connect() 17 elif host == "192.168.43.165": 18 self.connect_1() 19 self.channel = self.connection.channel() 20 result = self.channel.queue_declare(exclusive=True) 21 self.callback_queue = result.method.queue 22 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue) 23 self.response = None 24 self.corr_id = str(random.randint(10000, 99999)) 25 ack = self.channel.basic_publish(exchange='', 26 routing_key= "127.0.0.1", 27 properties=pika.BasicProperties( 28 reply_to=self.callback_queue, 29 correlation_id=self.corr_id), 30 body=str(command)) 31 while self.response is None: 32 self.connection.process_data_events() 33 task_id = self.corr_id 34 res = self.response.decode() 35 tmp_dict[task_id] = res 36 print('\33[42;0mtask_id: %s host: %s cmd: %s\33[0m' % (self.corr_id, host, command)) 37 class rpc_start(object): 38 def __init__(self): 39 self.tmp_dict = {} 40 self.start() 41 def check_all(self,*args): 42 for index, key in enumerate(self.tmp_dict.keys()): 43 print(index, '\33[42;0mtask_id: %s\33[0m' % key) 44 def check_task(self,user_cmd): 45 try: 46 command_list = user_cmd.split() 47 print(self.tmp_dict[command_list[1]]) 48 del self.tmp_dict[command_list[1]] 49 except IndexError: 50 help() 51 def run(self,user_cmd): 52 try: 53 hosts_obj = ( user_cmd.split('hosts')) 54 hosts = hosts_obj[1].strip().split() 55 command = user_cmd.split("\"")[1] 56 for host in hosts: 57 try: 58 if host == "127.0.0.1": 59 rpc_cilent.call(command, host,self.tmp_dict) 60 elif host == "192.168.43.165": 61 rpc_cilent.call(command, host,self.tmp_dict) 62 else: 63 print("\33[41;0mno host:%s\33[0m"%host) 64 except TypeError and AssertionError: 65 break 66 except IndexError: 67 print("\33[31;0mcommand not found\33[0m") 68 self.help() 69 def help(self): 70 print('\33[34;0mUsage: run "df -h" hosts 127.0.0.1 192.168.43.165 \33[0m') 71 print('\33[34;0m check_task 97128\33[0m') 72 print('\33[34;0m check_all\33[0m') 73 def start(self): 74 self.help() 75 while True: 76 user_cmd = input("->>").strip() 77 if not user_cmd:continue 78 self.cmd = user_cmd.split()[0] 79 if hasattr(self, self.cmd): 80 getattr(self, self.cmd)(user_cmd) 81 else: 82 print("\33[31;0mcommand not found!\33[0m") 83 self.help() 84 85 rpc_cilent = rpc_cilent() 86 start = rpc_start()
1 import pika 2 import subprocess 3 4 class rpc_server(object): 5 def __init__(self,host,queue): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) 7 self.channel = self.connection.channel() 8 self.channel.queue_declare(queue=queue) 9 self.channel.basic_qos(prefetch_count=1) 10 self.channel.basic_consume(self.on_request, queue=queue) 11 print(" [x] Awaiting RPC requests") 12 self.channel.start_consuming() 13 def command(self,cmd): 14 res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 15 msg = res.stdout.read().decode('gbk') 16 if len(msg) == 0: 17 msg = res.stderr.read().decode('gbk') 18 print(msg) 19 return msg 20 def on_request(self,ch, method, props, body): 21 cmd = body.decode() 22 print(cmd) 23 respone = self.command(cmd) 24 ch.basic_publish(exchange='', 25 routing_key=props.reply_to, 26 properties=pika.BasicProperties(correlation_id=props.correlation_id), 27 body=respone) 28 ch.basic_ack(delivery_tag=method.delivery_tag) 29 30 server = rpc_server("localhost","127.0.0.1")
三、测试样图
启动客户端
windows端服务器启动
linux端服务器启动
client端测试: