Python 程序:基于RabbitMQ实现主机管理
1、需求
2、代码
3、测试样图
一、需求
1、可以异步的执行多个命令
2、对多台机器
举例
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4
task id: 45334>>: check_task 45334 >>:二、代码
1 import random 2 import pika 3 4 class rpc_cilent(object): 5 def connect(self): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 7 def connect_1(self): 8 self.credentials = pika.PlainCredentials('zz', '123456') 9 self.connection = pika.BlockingConnection(pika.ConnectionParameters(10 '192.168.43.165',5672,'/',self.credentials))11 def on_response(self, ch, method, props, body):12 if self.corr_id == props.correlation_id:13 self.response = body14 def call(self, command, host,tmp_dict):15 if host == "127.0.0.1":16 self.connect()17 elif host == "192.168.43.165":18 self.connect_1()19 self.channel = self.connection.channel()20 result = self.channel.queue_declare(exclusive=True)21 self.callback_queue = result.method.queue22 self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)23 self.response = None24 self.corr_id = str(random.randint(10000, 99999))25 ack = self.channel.basic_publish(exchange='',26 routing_key= "127.0.0.1",27 properties=pika.BasicProperties(28 reply_to=self.callback_queue,29 correlation_id=self.corr_id),30 body=str(command))31 while self.response is None:32 self.connection.process_data_events()33 task_id = self.corr_id34 res = self.response.decode()35 tmp_dict[task_id] = res36 print('\33[42;0mtask_id: %s host: %s cmd: %s\33[0m' % (self.corr_id, host, command))37 class rpc_start(object):38 def __init__(self):39 self.tmp_dict = {}40 self.start()41 def check_all(self,*args):42 for index, key in enumerate(self.tmp_dict.keys()):43 print(index, '\33[42;0mtask_id: %s\33[0m' % key)44 def check_task(self,user_cmd):45 try:46 command_list = user_cmd.split()47 print(self.tmp_dict[command_list[1]])48 del self.tmp_dict[command_list[1]]49 except IndexError:50 help()51 def run(self,user_cmd):52 try:53 hosts_obj = ( user_cmd.split('hosts'))54 hosts = hosts_obj[1].strip().split()55 command = user_cmd.split("\"")[1]56 for host in hosts:57 try:58 if host == "127.0.0.1":59 rpc_cilent.call(command, host,self.tmp_dict)60 elif host == "192.168.43.165":61 rpc_cilent.call(command, host,self.tmp_dict)62 else:63 print("\33[41;0mno host:%s\33[0m"%host)64 except TypeError and AssertionError:65 break66 except IndexError:67 print("\33[31;0mcommand not found\33[0m")68 self.help()69 def help(self):70 print('\33[34;0mUsage: run "df -h" hosts 127.0.0.1 192.168.43.165 \33[0m')71 print('\33[34;0m check_task 97128\33[0m')72 print('\33[34;0m check_all\33[0m')73 def start(self):74 self.help()75 while True:76 user_cmd = input("->>").strip()77 if not user_cmd:continue78 self.cmd = user_cmd.split()[0]79 if hasattr(self, self.cmd):80 getattr(self, self.cmd)(user_cmd)81 else:82 print("\33[31;0mcommand not found!\33[0m")83 self.help()84 85 rpc_cilent = rpc_cilent()86 start = rpc_start()
1 import pika 2 import subprocess 3 4 class rpc_server(object): 5 def __init__(self,host,queue): 6 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) 7 self.channel = self.connection.channel() 8 self.channel.queue_declare(queue=queue) 9 self.channel.basic_qos(prefetch_count=1)10 self.channel.basic_consume(self.on_request, queue=queue)11 print(" [x] Awaiting RPC requests")12 self.channel.start_consuming()13 def command(self,cmd):14 res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)15 msg = res.stdout.read().decode('gbk')16 if len(msg) == 0:17 msg = res.stderr.read().decode('gbk')18 print(msg)19 return msg20 def on_request(self,ch, method, props, body):21 cmd = body.decode()22 print(cmd)23 respone = self.command(cmd)24 ch.basic_publish(exchange='',25 routing_key=props.reply_to,26 properties=pika.BasicProperties(correlation_id=props.correlation_id),27 body=respone)28 ch.basic_ack(delivery_tag=method.delivery_tag)29 30 server = rpc_server("localhost","127.0.0.1")
三、测试样图
启动客户端
windows端服务器启动
linux端服务器启动
client端测试: