Python封裝zabbix-get接口的代碼分享
Zabbix 是一款強大的開源網(wǎng)管監(jiān)控工具,該工具的客戶端與服務(wù)端是分開的,我們可以直接使用自帶的zabbix_get命令來實現(xiàn)拉取客戶端上的各種數(shù)據(jù),在本地組裝參數(shù)并使用Popen開子線程執(zhí)行該命令,即可實現(xiàn)批量監(jiān)測。
封裝Engine類: 該類的主要封裝了Zabbix接口的調(diào)用,包括最基本的參數(shù)收集.
import subprocess,datetime,time,math
class Engine():
def __init__(self,address,port):
self.address = address
self.port = port
def GetValue(self,key):
try:
command = "get.exe -s {0} -p {1} -k {2}".format(self.address,self.port,key).split(" ")
start = datetime.datetime.now()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while process.poll() is None:
time.sleep(1)
now = datetime.datetime.now()
if (now - start).seconds > 2:
return 0
return str(process.stdout.readlines()[0].split()[0],"utf-8")
except Exception:
return 0
# ping檢測
def GetPing(self):
ref_dict = {"Address":0,"Ping":0}
ref_dict["Address"] = self.address
ref_dict["Ping"] = self.GetValue("agent.ping")
if ref_dict["Ping"] == "1":
return ref_dict
else:
ref_dict["Ping"] = "0"
return ref_dict
return ref_dict
# 獲取主機組基本信息
def GetSystem(self):
ref_dict = { "Address" : 0 ,"HostName" : 0,"Uname":0 }
ref_dict["Address"] = self.address
ref_dict["HostName"] = self.GetValue("system.hostname")
ref_dict["Uname"] = self.GetValue("system.uname")
return ref_dict
# 獲取CPU利用率
def GetCPU(self):
ref_dict = { "Address": 0 ,"Core": 0,"Active":0 , "Avg1": 0 ,"Avg5":0 , "Avg15":0 }
ref_dict["Address"] = self.address
ref_dict["Core"] = self.GetValue("system.cpu.num")
ref_dict["Active"] = math.ceil(float(self.GetValue("system.cpu.util")))
ref_dict["Avg1"] = self.GetValue("system.cpu.load[,avg1]")
ref_dict["Avg5"] = self.GetValue("system.cpu.load[,avg5]")
ref_dict["Avg15"] = self.GetValue("system.cpu.load[,avg15]")
return ref_dict
# 獲取內(nèi)存利用率
def GetMemory(self):
ref_dict = { "Address":"0","Total":"0","Free":0,"Percentage":"0" }
ref_dict["Address"] = self.address
fps = self.GetPing()
if fps['Ping'] != "0":
ref_dict["Total"] = self.GetValue("vm.memory.size[total]")
ref_dict["Free"] = self.GetValue("vm.memory.size[free]")
# 計算百分比: percentage = 100 - int(Free/int(Total/100))
ref_dict["Percentage"] = str( 100 - int( int(ref_dict.get("Free")) / (int(ref_dict.get("Total"))/100)) ) + "%"
return ref_dict
else:
return ref_dict
# 獲取磁盤數(shù)據(jù)
def GetDisk(self):
ref_list = []
fps = self.GetPing()
if fps['Ping'] != "0":
disk_ = eval( self.GetValue("vfs.fs.discovery"))
for x in range(len(disk_)):
dict_ = {"Address": 0, "Name": 0, "Type": 0, "Free": 0}
dict_["Address"] = self.address
dict_["Name"] = disk_[x].get("{#FSNAME}")
dict_["Type"] = disk_[x].get("{#FSTYPE}")
if dict_["Type"] != "UNKNOWN":
pfree = self.GetValue("vfs.fs.size[\"{0}\",pfree]".format(dict_["Name"]))
dict_["Free"] = str(math.ceil(float(pfree)))
else:
dict_["Free"] = -1
ref_list.append(dict_)
return ref_list
return ref_list
# 獲取進(jìn)程狀態(tài)
def GetProcessStatus(self,process_name):
fps = self.GetPing()
dict_ = {"Address": '0', "ProcessName": '0', "ProcessCount": '0', "Status": '0'}
if fps['Ping'] != "0":
proc_id = self.GetValue("proc.num[\"{}\"]".format(process_name))
dict_['Address'] = self.address
dict_['ProcessName'] = process_name
if proc_id != "0":
dict_['ProcessCount'] = proc_id
dict_['Status'] = "True"
else:
dict_['Status'] = "False"
return dict_
return dict_
# 獲取端口開放狀態(tài)
def GetNetworkPort(self,port):
dict_ = {"Address": '0', "Status": 'False'}
dict_['Address'] = self.address
fps = self.GetPing()
if fps['Ping'] != "0":
port_ = self.GetValue("net.tcp.listen[{}]".format(port))
if port_ == "1":
dict_['Status'] = "True"
else:
dict_['Status'] = "False"
return dict_
return dict_
# 檢測Web服務(wù)器狀態(tài) 通過本地地址:端口 => 檢測目標(biāo)地址:端口
def CheckWebServerStatus(self,check_addr,check_port):
dict_ = {"local_address": "0", "remote_address": "0", "remote_port": "0", "Status":"False"}
fps = self.GetPing()
dict_['local_address'] = self.address
dict_['remote_address'] = check_addr
dict_['remote_port'] = check_port
if fps['Ping'] != "0":
check_ = self.GetValue("net.tcp.port[\"{}\",\"{}\"]".format(check_addr,check_port))
if check_ == "1":
dict_['Status'] = "True"
else:
dict_['Status'] = "False"
return dict_
return dict_
當(dāng)我們需要使用時,只需要定義變量調(diào)用即可,其調(diào)用代碼如下。
from engine import Engine
if __name__ == "__main__":
ptr_windows = Engine("127.0.0.1","10050")
ret = ptr_windows.GetDisk()
if len(ret) != 0:
for item in ret:
addr = item.get("Address")
name = item.get("Name")
type = item.get("Type")
space = item.get("Free")
if type != "UNKNOWN" and space != -1:
print("地址: {} --> 盤符: {} --> 格式: {} --> 剩余空間: {}".format(addr,name,type,space))到此這篇關(guān)于Python封裝zabbix-get接口的代碼分享的文章就介紹到這了,更多相關(guān)Python封裝zabbix-get接口內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python深度學(xué)習(xí)之Keras模型轉(zhuǎn)換成ONNX模型流程詳解
這篇文章主要介紹了Python深度學(xué)習(xí)之Keras模型轉(zhuǎn)換成ONNX模型流程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-09-09
Python中if __name__ == ''__main__''作用解析
這篇文章主要介紹了Python中if __name__ == '__main__'作用解析,這斷代碼在Python中非常常見,它有作用?本文就解析了它的作用,需要的朋友可以參考下2015-06-06
python中通過selenium簡單操作及元素定位知識點總結(jié)
在本篇文章里小編給大家整理的是關(guān)于python中通過selenium簡單操作及元素定位的知識點,有需要的朋友們可以學(xué)習(xí)下。2019-09-09
Python加載文件內(nèi)容的兩種實現(xiàn)方式
這篇文章主要介紹了Python加載文件內(nèi)容的兩種實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-09-09
Python數(shù)據(jù)分析之Matplotlib的常用操作總結(jié)
Matplotlib是Python的繪圖庫,它可與NumPy一起使用,提供了一種有效的MatLab開源替代方案,下面這篇文章主要給大家介紹了關(guān)于Python數(shù)據(jù)分析之Matplotlib常用操作的相關(guān)資料,需要的朋友可以參考下2022-01-01

