博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python 程序:基于RabbitMQ实现主机管理
阅读量:6354 次
发布时间:2019-06-22

本文共 5317 字,大约阅读时间需要 17 分钟。

Python 程序:基于RabbitMQ实现主机管理


1、需求

2、代码

3、测试样图


一、需求

1、可以异步的执行多个命令

2、对多台机器

举例

>>:run "df -h" --hosts 192.168.3.55 10.4.3.4 

task id: 45334
>>: check_task 45334 
>>:

二、代码

1 import random 2 import pika 3  4 class rpc_cilent(object): 5     def connect(self): 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 7     def connect_1(self): 8         self.credentials = pika.PlainCredentials('zz', '123456') 9         self.connection = pika.BlockingConnection(pika.ConnectionParameters(10         '192.168.43.165',5672,'/',self.credentials))11     def on_response(self, ch, method, props, body):12         if self.corr_id == props.correlation_id:13             self.response = body14     def call(self, command, host,tmp_dict):15         if host == "127.0.0.1":16             self.connect()17         elif host == "192.168.43.165":18             self.connect_1()19         self.channel = self.connection.channel()20         result = self.channel.queue_declare(exclusive=True)21         self.callback_queue = result.method.queue22         self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)23         self.response = None24         self.corr_id = str(random.randint(10000, 99999))25         ack = self.channel.basic_publish(exchange='',26                                          routing_key= "127.0.0.1",27                                          properties=pika.BasicProperties(28                                              reply_to=self.callback_queue,29                                              correlation_id=self.corr_id),30                                          body=str(command))31         while self.response is None:32             self.connection.process_data_events()33         task_id = self.corr_id34         res = self.response.decode()35         tmp_dict[task_id] = res36         print('\33[42;0mtask_id: %s host: %s cmd: %s\33[0m' % (self.corr_id, host, command))37 class rpc_start(object):38     def __init__(self):39         self.tmp_dict = {}40         self.start()41     def check_all(self,*args):42         for index, key in enumerate(self.tmp_dict.keys()):43             print(index, '\33[42;0mtask_id: %s\33[0m' % key)44     def check_task(self,user_cmd):45         try:46             command_list = user_cmd.split()47             print(self.tmp_dict[command_list[1]])48             del self.tmp_dict[command_list[1]]49         except IndexError:50             help()51     def run(self,user_cmd):52          try:53             hosts_obj = ( user_cmd.split('hosts'))54             hosts = hosts_obj[1].strip().split()55             command = user_cmd.split("\"")[1]56             for host in hosts:57                 try:58                     if host == "127.0.0.1":59                         rpc_cilent.call(command, host,self.tmp_dict)60                     elif host == "192.168.43.165":61                         rpc_cilent.call(command, host,self.tmp_dict)62                     else:63                         print("\33[41;0mno host:%s\33[0m"%host)64                 except TypeError and AssertionError:65                     break66          except IndexError:67             print("\33[31;0mcommand not found\33[0m")68             self.help()69     def help(self):70         print('\33[34;0mUsage: run "df -h" hosts 127.0.0.1 192.168.43.165 \33[0m')71         print('\33[34;0m       check_task 97128\33[0m')72         print('\33[34;0m       check_all\33[0m')73     def start(self):74         self.help()75         while True:76             user_cmd = input("->>").strip()77             if not user_cmd:continue78             self.cmd = user_cmd.split()[0]79             if hasattr(self, self.cmd):80                 getattr(self, self.cmd)(user_cmd)81             else:82                 print("\33[31;0mcommand not found!\33[0m")83                 self.help()84 85 rpc_cilent = rpc_cilent()86 start = rpc_start()
rpc_client
1 import pika 2 import subprocess 3  4 class rpc_server(object): 5     def __init__(self,host,queue): 6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host)) 7         self.channel = self.connection.channel() 8         self.channel.queue_declare(queue=queue) 9         self.channel.basic_qos(prefetch_count=1)10         self.channel.basic_consume(self.on_request, queue=queue)11         print(" [x] Awaiting RPC requests")12         self.channel.start_consuming()13     def command(self,cmd):14         res = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)15         msg = res.stdout.read().decode('gbk')16         if len(msg) == 0:17             msg = res.stderr.read().decode('gbk')18         print(msg)19         return msg20     def on_request(self,ch, method, props, body):21         cmd = body.decode()22         print(cmd)23         respone = self.command(cmd)24         ch.basic_publish(exchange='',25                          routing_key=props.reply_to,26                          properties=pika.BasicProperties(correlation_id=props.correlation_id),27                          body=respone)28         ch.basic_ack(delivery_tag=method.delivery_tag)29 30 server = rpc_server("localhost","127.0.0.1")
rpc_server

三、测试样图

启动客户端

windows端服务器启动

linux端服务器启动

client端测试:

 

转载于:https://www.cnblogs.com/hy0822/p/9284507.html

你可能感兴趣的文章
spring boot 包jar运行
查看>>
18年秋季学习总结
查看>>
Effective前端1:能使用html/css解决的问题就不要使用JS
查看>>
网络攻防 实验一
查看>>
由莫名其妙的错误开始---浅谈jquery的dom节点创建
查看>>
磨刀-CodeWarrior11生成的Makefile解析
查看>>
String StringBuffer StringBuilder对比
查看>>
bootstrap随笔点击增加
查看>>
oracle 中proc和oci操作对缓存不同处理
查看>>
[LeetCode] Spiral Matrix 解题报告
查看>>
60906磁悬浮动力系统应用研究与模型搭建
查看>>
指纹获取 Fingerprint2
查看>>
面试题目3:智能指针
查看>>
flask ORM: Flask-SQLAlchemy【单表】增删改查
查看>>
vim 常用指令
查看>>
nodejs 获取自己的ip
查看>>
Nest.js 处理错误
查看>>
你好,C++(16)用表达式表达我们的设计意图——4.1 用操作符对数据进行运算...
查看>>
18.3 redis 的安装
查看>>
jdbc 简单连接
查看>>