PyQt作品 – PingTester – 多点Ping测试工具

由于猫每次在一个临时测试点此测试一大片服务器的延迟和丢包, 一个个去跑太蛋疼, 于是用PyQt做了这么个小工具来测试各种线路的延迟丢包等信息.
这是我的第二个PyQt作品= =|||

截图:

(Archlinux / KDE4 环境下)


(Windows XP)

这个工具我已经初步实现了跨平台(在以上截图环境下运行正常), 在编写过程中, 我有如下的收获:

  • 不可以在子线程中直接操作UI, 以免引起资源冲突导致Segmentation Fault
  • 使用Queue类可以初步实现子线程与UI线程更新界面的通信. Signal方面, 我实例了一个QTimer, 每隔一定时间处理一次消息队列.
  • QTableView比QTableWidget效率高得多, 尤其是在Win32平台下. 因此应尽量采用 QTableView + QStandardItemModel 的搭配来做Table


注: 代码中已经内置了一份测试IP列表, 可以根据需要添加/删除. 第一次运行会生成 ips.conf 文件, 以后需要修改IP列表, 只需要编辑此文件.
再注: 这个程序写的确实很丑, 欢迎各种拍砖 = =

下面贴上全部的代码:

# -*- coding: utf-8 -*-

import sys
import time
import subprocess
from threading import Thread
import re
from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView
from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject
from Queue import Queue

#from Ui_main import Ui_MainWindow

try:
    _fromUtf8 = QString.fromUtf8
except AttributeError:
    _fromUtf8 = lambda s: s

class Ui_MainWindow(object):
    def setupUi(self, MainWindow):
        MainWindow.setObjectName(_fromUtf8("MainWindow"))
        MainWindow.resize(500, 435)
        self.centralWidget = QWidget(MainWindow)
        self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
        self.verticalLayout = QVBoxLayout(self.centralWidget)
        self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
        self.tableView = QTableView(self.centralWidget)
        self.tableView.setObjectName(_fromUtf8("tableView"))
        self.verticalLayout.addWidget(self.tableView)
        MainWindow.setCentralWidget(self.centralWidget)

        self.retranslateUi(MainWindow)
        QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8))

if sys.platform.startswith('linux'):
    getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms")
    pingstr = ["ping", "-n", "-i 0.2"]
    filtered = "Packet filtered"
    delaytime = 200
else:
    getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)")
    pingstr = ["ping.exe", "-t"]
    timeout = "Request timed out."
    delaytime = 500

try:
    with open("ips.conf", "r") as f:
        t_node = f.read().decode('utf-8')
        if not t_node:
            raise IOError
except IOError:
    with open("ips.conf", "w") as f:
        t_node = u"""
        210.51.176.182-北京联通BGP
        202.97.224.68-黑龙江网通
        202.99.96.68-天津网通
        202.99.160.68-河北网通
        202.96.69.38-大连网通
        221.208.172.1-哈尔滨网通
        202.99.192.68-山西网通
        202.99.160.68-河北网通
        210.52.149.2-湖北网通
        218.30.64.121-湖南电信
        202.100.4.15-陕西电信 
        202.96.199.133-上海电信 
        219.150.150.150-河南电信 
        219.146.0.130-黑龙江电信
        202.98.96.68-四川电信
        59.66.4.50-北京教育网
        202.112.26.246-上海教育网
        202.114.0.254-武汉教育网
        211.161.159.9-武汉长城宽带
        143.90.12.170-日本东京odn
        202.160.123.5-新加坡Host1Plus
        60.199.17.138-台湾固网
        220.128.152.1-台湾HINET
        168.95.1.1-台湾中华电信
        202.65.207.1-香港DYX
        202.181.171.1-香港HKCIX
        203.169.186.1-香港NTT.HKNET
        59.148.193.38-香港HKCTI
        220.232.214.1-香港Supernet
        59.152.208.1-香港WTT
        210.56.48.1-香港HKSUN
        202.76.56.1-香港CPCNET
        218.213.250.130-香港SNL
        69.162.132.1-美国芝加哥Level3
        205.185.112.131-美国佛里蒙特HE
        204.152.221.1-美国洛杉矶nLayer
        178.63.62.208-德国Nuremburg
        94.23.202.83-法国OVH
        69.163.43.73-美国波特兰HE
        216.189.1.14-美国RockHill
        69.172.231.1-美国洛杉矶Peer1
        184.22.112.34-美国迈阿密HE
        199.48.146.37-美国圣琼斯
        216.245.208.1-美国达拉斯
        109.74.207.9-英国伦敦
        """
        f.write(t_node.encode('utf-8'))

node = []
for line in t_node.split('\n'):
    try:
        ip, desc = line.strip().split("-")
        node.append((ip, desc))
    except ValueError:
        pass
nodecount = len(node)

class MainWindow(QMainWindow, Ui_MainWindow):
    """
    Class documentation goes here.
    """
    def __init__(self, parent = None):
        """
        Constructor
        """
        QMainWindow.__init__(self, parent)
        self.setupUi(self)
        self.model = QStandardItemModel()
        self.model.setColumnCount(6)
        self.model.setRowCount(nodecount)
        self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"])
        for i, (ip, desc) in enumerate(node):
            self.setitem(i, 0, ip)
            self.setitem(i, 1, desc)
            self.setitem(i, 2, "")
            self.setitem(i, 3, "")
            self.setitem(i, 4, "")
            self.setitem(i, 5, "")
        self.tableView.setModel(self.model)
        for i in range(len(node)):
            self.tableView.setRowHeight(i, 18)
        self.resizetable()
        self.timer = QTimer(self)
        self.connect(self.timer,
                     SIGNAL("timeout()"),
                     self.checkitems)
        self.timer.start(delaytime)
    
    def checkitems(self):
        while not q.empty():
            item = q.get()
            self.chgtxt(*item)
            q.task_done()
        self.resizetable()
    
    def resizetable(self):
        self.tableView.resizeColumnsToContents()
        
    def chgtxt(self, x, y, value):
        self.model.item(x, y).setText(value)
    
    def setitem(self, x, y, value):
        self.model.setItem(x, y, QStandardItem(value))
        
app = QApplication(sys.argv)
ui = MainWindow()
ui.show()
q = Queue()

def pinger(i, ip, desc):
    s = ""
    avgping = 0
    count = 0
    timeoutcount = 0
    ret = subprocess.Popen(pingstr + [ip],
                            stdout=subprocess.PIPE)
    while True:
        try:
            s += ret.stdout.read(1)
            
            tryfind = getdata.findall(s)
            if sys.platform.startswith('linux'):
                if len(tryfind) > 0:
                    req, ttl, crtping = tryfind[-1]
                    avgping += float(crtping)
                    count += 1
                    q.put((i, 3, crtping + "ms"))
                    q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                    q.put((i, 5, ttl))
                    q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req))))
                    s = ""
                elif filtered in s:
                    q.put((i, 2, "Failed"))
                    q.put((i, 3, "Failed"))
                    q.put((i, 4, "Failed"))
                    q.put((i, 5, "Failed"))
                    ret.kill()
                    s = ""
            else:
                if len(tryfind) > 0:
                    crtping, ttl = tryfind[-1]
                    avgping += float(crtping)
                    count += 1
                    q.put((i, 3, crtping + "ms"))
                    q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                    q.put((i, 5, ttl))
                    q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                elif timeout in s:
                    timeoutcount += 1
                    q.put((i, 2, "-"))
                    q.put((i, 3, "-"))
                    if count:
                        q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                    else:
                        q.put((i, 5, "-"))
                    s = ""
        except IOError:
            print s
            break
            
def startworkers():
    for i, (ip, desc) in enumerate(node):
        worker = Thread(target=pinger, args=(i, ip, desc))
        worker.setDaemon(True)
        worker.start()
        time.sleep(delaytime / 10000.0)

startthread = Thread(target=startworkers)
startthread.setDaemon(True)
startthread.start()

sys.exit(app.exec_())

21 thoughts on “PyQt作品 – PingTester – 多点Ping测试工具”

  1. 学习你了代码,借用你的代码在Stackflow求到了如何正常关闭subprocess。
    给作者你反馈下,谢谢你的分享

    # -*- coding: utf-8 -*-
     
    import sys
    import time
    import subprocess
    from threading import Thread
    import re
    from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView
    from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject
    from Queue import Queue
    import atexit
    
    try:
        _fromUtf8 = QString.fromUtf8
    except AttributeError:
        _fromUtf8 = lambda s: s
     
    class Ui_MainWindow(object):
        def setupUi(self, MainWindow):
            MainWindow.setObjectName(_fromUtf8("MainWindow"))
            MainWindow.resize(500, 435)
            self.centralWidget = QWidget(MainWindow)
            self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
            self.verticalLayout = QVBoxLayout(self.centralWidget)
            self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
            self.tableView = QTableView(self.centralWidget)
            self.tableView.setObjectName(_fromUtf8("tableView"))
            self.verticalLayout.addWidget(self.tableView)
            MainWindow.setCentralWidget(self.centralWidget)
     
            self.retranslateUi(MainWindow)
            QMetaObject.connectSlotsByName(MainWindow)
     
        def retranslateUi(self, MainWindow):
            MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8))
     
    if sys.platform.startswith('linux'):
        getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms")
        pingstr = ["ping", "-n", "-i 0.2"]
        filtered = "Packet filtered"
        delaytime = 200
    else:
        getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)")
        pingstr = ["ping.exe", "-t"]
        timeout = "Request timed out."
        delaytime = 500
     
    try:
        with open("ips.conf", "r") as f:
            t_node = f.read().decode('utf-8')
            if not t_node:
                raise IOError
    except IOError:
        with open("ips.conf", "w") as f:
            t_node = u"""
            8.8.8.8-Google
            184.22.112.34-USAHE
            """
            f.write(t_node.encode('utf-8'))
     
    node = []
    for line in t_node.split('\n'):
        try:
            ip, desc = line.strip().split("-")
            node.append((ip, desc))
        except ValueError:
            pass
    nodecount = len(node)
     
    class MainWindow(QMainWindow, Ui_MainWindow):
        """
        Class documentation goes here.
        """
        def __init__(self, parent = None):
            """
            Constructor
            """
            QMainWindow.__init__(self, parent)
            self.setupUi(self)
            self.model = QStandardItemModel()
            self.model.setColumnCount(6)
            self.model.setRowCount(nodecount)
            self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"])
            for i, (ip, desc) in enumerate(node):
                self.setitem(i, 0, ip)
                self.setitem(i, 1, desc)
                self.setitem(i, 2, "")
                self.setitem(i, 3, "")
                self.setitem(i, 4, "")
                self.setitem(i, 5, "")
            self.tableView.setModel(self.model)
            for i in range(len(node)):
                self.tableView.setRowHeight(i, 18)
            self.resizetable()
            self.timer = QTimer(self)
            self.connect(self.timer,
                         SIGNAL("timeout()"),
                         self.checkitems)
            self.timer.start(delaytime)
     
        def checkitems(self):
            while not q.empty():
                item = q.get()
                self.chgtxt(*item)
                q.task_done()
            self.resizetable()
     
        def resizetable(self):
            self.tableView.resizeColumnsToContents()
     
        def chgtxt(self, x, y, value):
            self.model.item(x, y).setText(value)
     
        def setitem(self, x, y, value):
            self.model.setItem(x, y, QStandardItem(value))
     
    app = QApplication(sys.argv)
    ui = MainWindow()
    ui.show()
    q = Queue()
    
    def kill_proc(proc):
        try:
            proc.terminate()
        except Exception:
            pass
    
    def pinger(i, ip, desc):
        s = ""
        avgping = 0
        count = 0
        timeoutcount = 0
        ret = subprocess.Popen(pingstr + [ip],
                                stdout=subprocess.PIPE)
        atexit.register(kill_proc, ret)  # new add
    
        while True:
            try:
                # new add
                out = ret.stdout.read(1)
                if not out:
                    break
                s += out
    
                tryfind = getdata.findall(s)
                if sys.platform.startswith('linux'):
                    if len(tryfind) > 0:
                        req, ttl, crtping = tryfind[-1]
                        avgping += float(crtping)
                        count += 1
                        q.put((i, 3, crtping + "ms"))
                        q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                        q.put((i, 5, ttl))
                        q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req))))
                        s = ""
                    elif filtered in s:
                        q.put((i, 2, "Failed"))
                        q.put((i, 3, "Failed"))
                        q.put((i, 4, "Failed"))
                        q.put((i, 5, "Failed"))
                        ret.kill()
                        s = ""
                else:
                    if len(tryfind) > 0:
                        crtping, ttl = tryfind[-1]
                        avgping += float(crtping)
                        count += 1
                        q.put((i, 3, crtping + "ms"))
                        q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                        q.put((i, 5, ttl))
                        q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                    elif timeout in s:
                        timeoutcount += 1
                        q.put((i, 2, "-"))
                        q.put((i, 3, "-"))
                        if count:
                            q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                        else:
                            q.put((i, 5, "-"))
                        s = ""
            except IOError:
                print s
                break
        ret.communicate()
    
    def startworkers():
        for i, (ip, desc) in enumerate(node):
            worker = Thread(target=pinger, args=(i, ip, desc))
            worker.setDaemon(True)
            worker.start()
            time.sleep(delaytime / 10000.0)
    
    startthread = Thread(target=startworkers)
    startthread.setDaemon(True)
    startthread.start()
     
    sys.exit(app.exec_())
    
      1. Exception in thread Thread-7:
        Traceback (most recent call last):
        File “/usr/lib/python2.6/threading.py”, line 532, in __bootstrap_inner
        self.run()
        File “/usr/lib/python2.6/threading.py”, line 484, in run
        self.__target(*self.__args, **self.__kwargs)
        File “multiping.py”, line 154, in pinger
        tryfind = getdata.findall(s)
        AttributeError: ‘NoneType’ object has no attribute ‘findall’

        然后 ping 没杀掉……这个太严重了

Leave a Reply

Your email address will not be published. Required fields are marked *

QR Code Business Card