京东-优惠雷达
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

利用python psutil对系统进行全面监控

翩若惊鸿 1年前   阅读数 194 0

模仿系统监视器功能,将数据做文本保存

import psutil
import time, os
import platform

class Sample_data():
    #系统概况
    def profile(self):
        #用户
        user = psutil.users()
        userdict = [{'Name':user[i].name, 'Starttime': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user[i].started))} for i in range(len(user))]
        #开机时长(小时)
        starttime = psutil.boot_time()
        boottime = time.time()-starttime
        boottime = str(round(boottime/60/60, 2))+ '/h'
        #CPU使用率,和核数
        cpu ={'Cpu_usage':psutil.cpu_percent(interval=0.1),' Cpu_num': psutil.cpu_count()}
        #磁盘使用率,大小
        disks = psutil.disk_partitions()
        totalsize = sum([int(psutil.disk_usage(i.device).total) for i in disks])
        totalused = sum([int(psutil.disk_usage(i.device).used) for i in disks])
        disk = {'partname':'totaldisk','totalsize': str(int(totalsize/(1024**3)))+ 'Gb' , 'percent':str(round(totalused/totalsize*100,2))}
        #TCP连接数量
        tcpnum = {'tcpnum':len(psutil.net_connections(kind='tcp'))}
        #进程数量
        proc_num = 0
        for i in psutil.pids():
            try:
                psutil.Process(i)
                proc_num += 1
            except:
                pass
        #内存使用率
        mem = psutil.virtual_memory()
        memory = {'totalsize':str(round(mem.total/(1024**3), 2))+ 'Gb', 'percent': mem.percent, 'free':str(round(mem.free/(1024**3), 2))}
        #操作系统
        plat = platform.uname()
        sysinfo = {'system': plat.system + plat.release, 'type': plat.machine, 'processor': plat.processor}


        content = dict(zip(['user','boottime','disk','memory','cpu', 'processnum',' sysinfo','tcpnum'],[ userdict,boottime, disk,memory, cpu, proc_num, sysinfo,tcpnum]))
        return content
    #进程列表
    def process(self):
        pids =psutil.pids()
        pros =[]
        for i in pids:
            try:
                #
                pr = psutil.Process(i)
                pro = dict(zip(['pid','name','user','memory','threadsnum','path','createtime'],[i,pr.name(), pr.username(),dict(zip(pr.memory_info()._fields,[i for i in pr.memory_info()]))['vms']/1024,
                        pr.num_threads(),pr.exe(),time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(pr.create_time()))]))
                pros.append(pro)
            except:
                pass

        return pros

    #网络信息
    def network(self):
        #网络连接
        conn = psutil.net_connections('all')
        connect = []
        for i in conn:
            laddr, raddr, status, pid = i[3:]
            try:
                pname = psutil.Process(pid).name()
            except:
                continue
            if not raddr:
                continue
            c = dict(zip(['pid','name','laddr','laddrport','raddr','raddrport'],[pid,pname, laddr.ip,laddr.port,raddr.ip,raddr.port,]))
            connect.append(c)

        #网络接口信息
        net = psutil.net_if_addrs()
        Net_addrs = {}
        for i,j in net.items():
            net_if_addrs = []
            for g in j:
                if '-1>' in str(g):
                    continue
                elif '2>' in str(g):
                    type = 'Ipv4'
                elif '23>' in str(g):
                    type ='Ipv6'
                n = dict(zip(['type','address', 'netmask', ],[type, g.address, g.netmask]))
                net_if_addrs.append(n)
            Net_addrs[i] = net_if_addrs
        #网络IO(kb/s)
        netio = psutil.net_io_counters(pernic= True)
        save ={}
        for i,j in netio.items():
            save[i] = [j.bytes_sent, j.bytes_recv]
        time.sleep(1)
        Net_io =[]
        netio_new = psutil.net_io_counters(pernic = True)
        for i ,j in netio_new.items():
            Net_io.append({i:dict(zip(['sent_bytes','recv_bytes'],[round((j.bytes_sent - save[i][0])/1024,2), round((j.bytes_recv - save[i][1])/1024,2)]))})

        network = {'connect': connect, 'netaddrs': Net_addrs, 'net_io':Net_io}
        return network

    #磁盘
    def disk_info(self):
        #各分区使用率
        disks = psutil.disk_partitions()
        diskper =[]
        for i in disks:
            part = psutil.disk_usage(i.device)
            d = dict(zip(['systemtype','partname','totalsize','available' ,'percent', ],[i.fstype, i.device.replace('\\', ''),str(int(part.total / (1024 ** 3))) + 'Gb', str(round(part.free/(1024**3),2)) +'Gb', part.percent]))
            diskper.append(d)

        #各分区IO(/Kb/s)
        diskio= psutil.disk_io_counters(perdisk= True)
        save ={}
        for i,j  in diskio.items():
            save[i] = [j.read_bytes, j.write_bytes]
        time.sleep(1)
        Diskio = []
        diskio_new = psutil.disk_io_counters(perdisk= True)
        for i, j in diskio_new.items():
            Diskio.append({i: dict(zip(['read_bytes', 'write_bytes'],[round((j.read_bytes - save[i][0])/1024, 2),round((j.write_bytes - save[i][1])/1024, 2)]))})
        disk = {'diskper': diskper, 'disk_io': diskio}
        return disk

if __name__ == '__main__':
    f = Sample_data()
    while True:
        for i in range(20):
            curtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            content = f.profile()
            process =f.process()
            network = f.network()
            disk = f.disk_info()
            with open('C:\Windows\System32\monitor.txt', 'a+',encoding='utf-8') as file:
                file.write('TTTTT\t'+str(curtime) + '\n' + str(content) + '\n'+  str(process) + '\n' + str(network) +'\n' + str(disk)+ '\n')
                file.close()
        os.remove('C:\Windows\System32\monitor.txt')


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: