Best Python code snippet using tempest_python
gui.py
Source:gui.py  
1#!/usr/bin/python32#-*- encoding: Utf-8 -*-3from PyQt5.QtWidgets import QApplication, QListWidgetItem, QDesktopWidget, QFileDialog, QInputDialog, QProgressDialog, QMessageBox, QFileSystemModel, QHeaderView4from PyQt5.QtCore import Qt, QUrl, pyqtSignal, QThread5from PyQt5.QtGui import QDesktopServices, QTextOption6from PyQt5.uic import loadUi7from signal import signal, SIGINT, SIG_DFL8from os.path import dirname, realpath9from collections import defaultdict10from urllib.parse import urlparse11from os import listdir, remove12from functools import partial13from json import load, dump14from binascii import crc3215from pathlib import Path16from sys import argv17from utils.common import extractors, transports, BASE_PATH, assert_installed, extractor_save, insert_endpoint, load_proto_msgs18from views.fuzzer import ProtobufItem, ProtocolDataItem19from utils.transports import *20from extractors import *21"""22    This script runs the main code for the PBTK GUI, and essentially23    links loaded QtDesigner *.uis within themselves using signals, and24    with the other parts of code composing PBTK. The order of methods25    more or less follows the action flow of a graphical user.26"""27class PBTKGUI(QApplication):28    def __init__(self):29        super().__init__(argv)30        signal(SIGINT, SIG_DFL)31        32        views = dirname(realpath(__file__)) + '/views/'33        34        self.welcome = loadUi(views + 'welcome.ui')35        self.choose_extractor = loadUi(views + 'choose_extractor.ui')36        self.choose_proto = loadUi(views + 'choose_proto.ui')37        self.create_endpoint = loadUi(views + 'create_endpoint.ui')38        self.choose_endpoint = loadUi(views + 'choose_endpoint.ui')39        self.fuzzer = loadUi(views + 'fuzzer.ui')40        self.welcome.step1.clicked.connect(self.load_extractors)41        self.choose_extractor.rejected.connect(partial(self.set_view, self.welcome))42        self.choose_extractor.extractors.itemClicked.connect(self.prompt_extractor)43        44        self.welcome.step2.clicked.connect(self.load_protos)45        self.proto_fs = QFileSystemModel()46        self.choose_proto.protos.setModel(self.proto_fs)47        self.proto_fs.directoryLoaded.connect(self.choose_proto.protos.expandAll)48        49        for i in range(1, self.proto_fs.columnCount()):50            self.choose_proto.protos.hideColumn(i)51        self.choose_proto.protos.setRootIndex(self.proto_fs.index(str(BASE_PATH / 'protos')))52        self.choose_proto.rejected.connect(partial(self.set_view, self.welcome))53        self.choose_proto.protos.clicked.connect(self.new_endpoint)54        55        self.create_endpoint.transports.itemClicked.connect(self.pick_transport)56        self.create_endpoint.loadRespPbBtn.clicked.connect(self.load_another_pb)57        self.create_endpoint.rejected.connect(partial(self.set_view, self.choose_proto))58        self.create_endpoint.buttonBox.accepted.connect(self.write_endpoint)59                60        self.welcome.step3.clicked.connect(self.load_endpoints)61        self.choose_endpoint.rejected.connect(partial(self.set_view, self.welcome))62        self.choose_endpoint.endpoints.itemClicked.connect(self.launch_fuzzer)63        64        self.fuzzer.rejected.connect(partial(self.set_view, self.choose_endpoint))65        self.fuzzer.fuzzFields.clicked.connect(self.fuzz_endpoint)66        self.fuzzer.deleteThis.clicked.connect(self.delete_endpoint)67        self.fuzzer.comboBox.activated.connect(self.launch_fuzzer)68        self.fuzzer.getAdd.clicked.connect(self.add_tab_data)69        self.fuzzer.urlField.setWordWrapMode(QTextOption.WrapAnywhere)70        71        for tree in (self.fuzzer.pbTree, self.fuzzer.getTree):72            tree.itemEntered.connect(lambda item, _: item.edit() if hasattr(item, 'edit') else None)73            tree.itemClicked.connect(lambda item, col: item.update_check(col=col))74            tree.itemExpanded.connect(lambda item: item.expanded() if hasattr(item, 'expanded') else None)75            tree.header().setSectionResizeMode(QHeaderView.ResizeToContents)76        77        self.welcome.mydirLabel.setText(self.welcome.mydirLabel.text() % BASE_PATH)78        self.welcome.mydirBtn.clicked.connect(partial(QDesktopServices.openUrl, QUrl.fromLocalFile(str(BASE_PATH))))79        80        self.set_view(self.welcome)81        self.exec_()82    83    """84        Step 1 - Extract .proto structures from apps85    """86    87    def load_extractors(self):88        self.choose_extractor.extractors.clear()89        90        for name, meta in extractors.items():91            item = QListWidgetItem(meta['desc'], self.choose_extractor.extractors)92            item.setData(Qt.UserRole, name)93        94        self.set_view(self.choose_extractor)95    96    def prompt_extractor(self, item):97        extractor = extractors[item.data(Qt.UserRole)]98        inputs = []99        if not assert_installed(self.view, **extractor.get('depends', {})):100            return101        102        if not extractor.get('pick_url', False):103            files, mime = QFileDialog.getOpenFileNames()104            for path in files:105                inputs.append((path, Path(path).stem))106        else:107            text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:')108            if text:109                url = urlparse(text)110                inputs.append((url.geturl(), url.netloc))111        112        if inputs:113            wait = QProgressDialog('Extracting .proto structures...', None, 0, 0)114            wait.setWindowTitle(' ')115            self.set_view(wait)116            117            self.worker = Worker(inputs, extractor)118            self.worker.progress.connect(self.extraction_progress)119            self.worker.finished.connect(self.extraction_done)120            self.worker.start()121    122    def extraction_progress(self, info, progress):123        self.view.setLabelText(info)124        125        if progress is not None:126            self.view.setRange(0, 100)127            self.view.setValue(progress * 100)128        else:129            self.view.setRange(0, 0)130    131    def extraction_done(self, outputs):132        nb_written_all, wrote_endpoints = 0, False133        134        for folder, output in outputs.items():135            nb_written, wrote_endpoints = extractor_save(BASE_PATH, folder, output)136            nb_written_all += nb_written137        138        if wrote_endpoints:139            self.set_view(self.welcome)140            QMessageBox.information(self.view, ' ', '%d endpoints and their <i>.proto</i> structures have been extracted! You can now reuse the <i>.proto</i>s or fuzz the endpoints.' % nb_written_all)141        142        elif nb_written_all:143            self.set_view(self.welcome)144            QMessageBox.information(self.view, ' ', '%d <i>.proto</i> structures have been extracted! You can now reuse the <i>.protos</i> or define endpoints for them to fuzz.' % nb_written_all)145        146        else:147            self.set_view(self.choose_extractor)148            QMessageBox.warning(self.view, ' ', 'This extractor did not find Protobuf structures in the corresponding format for specified files.')149    150    """151        Step 2 - Link .protos to endpoints152    """153    154    # Don't load .protos from the filesystem until asked to, in order155    # not to slow down startup.156    157    def load_protos(self):158        self.proto_fs.setRootPath(str(BASE_PATH / 'protos'))159        self.set_view(self.choose_proto)160    161    def new_endpoint(self, path):162        if not self.proto_fs.isDir(path):163            path = self.proto_fs.filePath(path)164            165            if not getattr(self, 'only_resp_combo', False):166                self.create_endpoint.pbRequestCombo.clear()167            self.create_endpoint.pbRespCombo.clear()168            169            has_msgs = False170            for name, cls in load_proto_msgs(path):171                has_msgs = True172                if not getattr(self, 'only_resp_combo', False):173                    self.create_endpoint.pbRequestCombo.addItem(name, (path, name))174                self.create_endpoint.pbRespCombo.addItem(name, (path, name))175            if not has_msgs:176                QMessageBox.warning(self.view, ' ', 'There is no message defined in this .proto.')177                return178            179            self.create_endpoint.reqDataSubform.hide()180            if not getattr(self, 'only_resp_combo', False):181                self.create_endpoint.endpointUrl.clear()182                self.create_endpoint.transports.clear()183                self.create_endpoint.sampleData.clear()184                self.create_endpoint.pbParamKey.clear()185                self.create_endpoint.parsePbCheckbox.setChecked(False)186                187                for name, meta in transports.items():188                    item = QListWidgetItem(meta['desc'], self.create_endpoint.transports)189                    item.setData(Qt.UserRole, (name, meta.get('ui_data_form')))190            191            elif getattr(self, 'saved_transport_choice'):192                self.create_endpoint.transports.setCurrentItem(self.saved_transport_choice)193                self.pick_transport(self.saved_transport_choice)194                self.saved_transport_choice = None195            196            self.only_resp_combo = False197            self.set_view(self.create_endpoint)198    199    def pick_transport(self, item):200        name, desc = item.data(Qt.UserRole)201        self.has_pb_param = desc and 'regular' in desc202        self.create_endpoint.reqDataSubform.show()203        if self.has_pb_param:204            self.create_endpoint.pbParamSubform.show()205        else:206            self.create_endpoint.pbParamSubform.hide()207        self.create_endpoint.sampleDataLabel.setText('Sample request data, one per line (in the form of %s):' % desc)208    209    def load_another_pb(self):210        self.only_resp_combo = True211        self.saved_transport_choice = self.create_endpoint.transports.currentItem()212        self.set_view(self.choose_proto)213    214    def write_endpoint(self):215        request_pb = self.create_endpoint.pbRequestCombo.itemData(self.create_endpoint.pbRequestCombo.currentIndex())216        url = self.create_endpoint.endpointUrl.text()217        transport = self.create_endpoint.transports.currentItem()218        sample_data = self.create_endpoint.sampleData.toPlainText()219        pb_param = self.create_endpoint.pbParamKey.text()220        has_resp_pb = self.create_endpoint.parsePbCheckbox.isChecked()221        resp_pb = self.create_endpoint.pbRespCombo.itemData(self.create_endpoint.pbRespCombo.currentIndex())222        223        if not (request_pb and urlparse(url).netloc and transport and (not self.has_pb_param or pb_param) and (not has_resp_pb or resp_pb)):224            QMessageBox.warning(self.view, ' ', 'Please fill all relevant information fields.')225        226        else:227            json = {228                'request': {229                    'transport': transport.data(Qt.UserRole)[0],230                    'proto_path': request_pb[0].replace(str(BASE_PATH / 'protos'), '').strip('/\\'),231                    'proto_msg': request_pb[1],232                    'url': url233                }234            }235            if self.has_pb_param:236                json['request']['pb_param'] = pb_param237            238            sample_data = list(filter(None, sample_data.split('\n')))239            if sample_data:240                transport_obj = transports[transport.data(Qt.UserRole)[0]]241                transport_obj = transport_obj['func'](pb_param, url)242                243                for sample_id, sample in enumerate(sample_data):244                    try:245                        sample = transport_obj.serialize_sample(sample)246                    except Exception:247                        return QMessageBox.warning(self.view, ' ', 'Some of your sample data is not in the specified format.')248                    if not sample:249                        return QMessageBox.warning(self.view, ' ', "Some of your sample data didn't contain the Protobuf parameter key you specified.")250                    sample_data[sample_id] = sample251                252                json['request']['samples'] = sample_data253            254            if has_resp_pb:255                json['response'] = {256                    'format': 'raw_pb',257                    'proto_path': resp_pb[0].replace(str(BASE_PATH / 'protos'), '').strip('/\\'),258                    'proto_msg': resp_pb[1]259                }260            insert_endpoint(BASE_PATH / 'endpoints', json)261            262            QMessageBox.information(self.view, ' ', 'Endpoint created successfully.')263            self.set_view(self.welcome)264    265    """266        Step 3: Fuzz and test endpoints live267    """268    269    def load_endpoints(self):270        self.choose_endpoint.endpoints.clear()271        272        for name in listdir(str(BASE_PATH / 'endpoints')):273            if name.endswith('.json'):274                item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)275                item.setFlags(item.flags() & ~Qt.ItemIsEnabled)276                277                pb_msg_to_endpoints = defaultdict(list)278                with open(str(BASE_PATH / 'endpoints' / name)) as fd:279                    for endpoint in load(fd, object_pairs_hook=OrderedDict):280                        pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint)281                282                for pb_msg, endpoints in pb_msg_to_endpoints.items():283                    item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)284                    item.setFlags(item.flags() & ~Qt.ItemIsEnabled)285                    286                    for endpoint in endpoints:287                        path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()288                        item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)289                        item.setData(Qt.UserRole, endpoint)290        291        self.set_view(self.choose_endpoint)292    293    def launch_fuzzer(self, item):294        if type(item) == int:295            data, sample_id = self.fuzzer.comboBox.itemData(item)296        else:297            data, sample_id = item.data(Qt.UserRole), 0298        299        if data:300            self.current_req_proto = BASE_PATH / 'protos' / data['request']['proto_path']301            302            self.pb_request = load_proto_msgs(self.current_req_proto)303            self.pb_request = dict(self.pb_request)[data['request']['proto_msg']]()304            305            if data.get('response') and data['response']['format'] == 'raw_pb':306                self.pb_resp = load_proto_msgs(BASE_PATH / 'protos' / data['response']['proto_path'])307                self.pb_resp = dict(self.pb_resp)[data['response']['proto_msg']]308            self.pb_param = data['request'].get('pb_param')309            self.base_url = data['request']['url']310            self.endpoint = data311            312            self.transport_meta = transports[data['request']['transport']]313            self.transport = self.transport_meta['func'](self.pb_param, self.base_url)314            315            sample = ''316            if data['request'].get('samples'):317                sample = data['request']['samples'][sample_id]318            self.get_params = self.transport.load_sample(sample, self.pb_request)319            320            # Get initial data into the Protobuf tree view321            self.fuzzer.pbTree.clear()322            323            self.ds_items = defaultdict(dict)324            self.ds_full_names = {}325            326            for ds in self.pb_request.DESCRIPTOR.fields:327                ProtobufItem(self.fuzzer.pbTree, ds, self, [ds.full_name])328            self.parse_fields(self.pb_request)329            330            # Do the same for transport-specific data331            self.fuzzer.getTree.clear()332            self.fuzzer.tabs.setTabText(1, self.transport_meta.get('ui_tab', ''))333            if self.get_params:334                for key, val in self.get_params.items():335                    ProtocolDataItem(self.fuzzer.getTree, key, val, self)336            337            # Fill the request samples combo box if we're loading a new338            # endpoint.339            if type(item) != int:340                if len(data['request'].get('samples', [])) > 1:341                    self.fuzzer.comboBox.clear()342                    for sample_id, sample in enumerate(data['request']['samples']):343                        self.fuzzer.comboBox.addItem(sample[self.pb_param] if self.pb_param else str(sample), (data, sample_id))344                    self.fuzzer.comboBoxLabel.show()345                    self.fuzzer.comboBox.show()346                else:347                    self.fuzzer.comboBoxLabel.hide()348                    self.fuzzer.comboBox.hide()349                350                self.set_view(self.fuzzer)351            352            self.fuzzer.frame.setUrl(QUrl("about:blank"))353            self.update_fuzzer()354    """355        Parsing and rendering the Protobuf message to a tree view:356        Every Protobuf field is fed to ProtobufItem (a class inheriting357        QTreeWidgetItem), and the created object is saved in the ds_items358        entry for the corresponding descriptor.359    """360    361    def parse_fields(self, msg, base_path=[]):362        for ds, val in msg.ListFields():363            path = base_path + [ds.full_name]364            365            if ds.label == ds.LABEL_REPEATED:366                for val_index, val_value in enumerate(val):367                    if ds.cpp_type == ds.CPPTYPE_MESSAGE:368                        self.ds_items[id(ds)][tuple(path)].setExpanded(True)369                        self.ds_items[id(ds)][tuple(path)].setDefault(parent=msg, msg=val, index=val_index)370                        self.parse_fields(val_value, path)371                    372                    else:373                        self.ds_items[id(ds)][tuple(path)].setDefault(val_value, parent=msg, msg=val, index=val_index)374                    375                    self.ds_items[id(ds)][tuple(path)].duplicate(True)376            377            else:378                if ds.cpp_type == ds.CPPTYPE_MESSAGE:379                    self.ds_items[id(ds)][tuple(path)].setExpanded(True)380                    self.ds_items[id(ds)][tuple(path)].setDefault(parent=msg, msg=val)381                    self.parse_fields(val, path)382                383                else:384                    self.ds_items[id(ds)][tuple(path)].setDefault(val, parent=msg, msg=val)385    386    def update_fuzzer(self):387        resp = self.transport.perform_request(self.pb_request, self.get_params)388        389        data, text, url, mime = resp.content, resp.text, resp.url, resp.headers['Content-Type'].split(';')[0]390        391        meta = '%s %d %08x\n%s' % (mime, len(data), crc32(data) & 0xffffffff, resp.url)392        self.fuzzer.urlField.setText(meta)393        394        self.fuzzer.frame.update_frame(data, text, url, mime, getattr(self, 'pb_resp', None))395    396    def fuzz_endpoint(self):397        QMessageBox.information(self.view, ' ', 'Automatic fuzzing is not implemented yet.')398    399    def delete_endpoint(self):400        if QMessageBox.question(self.view, ' ', 'Delete this endpoint?') == QMessageBox.Yes:401            path = str(BASE_PATH / 'endpoints' / (urlparse(self.base_url).netloc + '.json'))402            403            with open(path) as fd:404                json = load(fd, object_pairs_hook=OrderedDict)405            json.remove(self.endpoint)406            407            with open(path, 'w') as fd:408                dump(json, fd, ensure_ascii=False, indent=4)409            if not json:410                remove(path)411            412            self.load_endpoints()413    414    def add_tab_data(self):415        text, good = QInputDialog.getText(self.view, ' ', 'Field name:')416        if text:417            ProtocolDataItem(self.fuzzer.getTree, text, '', self).edit()418    419    """420        Utility methods follow421    """422    423    def set_view(self, view):424        if hasattr(self, 'view'):425            self.view.hide()426        view.show()427        self.view = view428        429        resolution = QDesktopWidget().screenGeometry()430        view.move((resolution.width() / 2) - (view.frameSize().width() / 2),431                  (resolution.height() / 2) - (view.frameSize().height() / 2))432"""433    Simple wrapper for running extractors in background.434"""435class Worker(QThread):436    finished = pyqtSignal(object)437    progress = pyqtSignal(object, object)438    def __init__(self, inputs, extractor):439        super().__init__()440        self.inputs = inputs441        self.extractor = extractor442    443    def run(self):444        output = defaultdict(list)445        for input_, folder in self.inputs:446            # Extractor is runned here447            for name, contents in self.extractor['func'](input_):448                if name == '_progress':449                    self.progress.emit(*contents)450                else:451                    output[folder].append((name, contents))452        453        self.finished.emit(output)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
