Best Python code snippet using fMBT_python
fmbtgti.py
Source:fmbtgti.py  
...1158                _fmbtLog('creating directory "%s" for screenshots failed: %s' %1159                         (necessaryDirs, e))1160                raise1161        return filepath1162    def _archiveScreenshot(self, filepath):1163        if self._screenshotArchiveMethod == "remove":1164            try:1165                os.remove(filepath)1166            except IOError:1167                pass1168        elif self._screenshotArchiveMethod.startswith("resize"):1169            if self._screenshotArchiveMethod == "resize":1170                convertArgs = ["-resize",1171                               "%sx" % (int(self.screenSize()[0]) / 4,)]1172            else:1173                widthHeight = self._screenshotArchiveMethod.split()[1]1174                convertArgs = ["-resize", widthHeight]1175            subprocess.call(["convert", filepath] + convertArgs + [filepath])1176    def _archiveScreenshots(self):1177        """1178        Archive screenshot files if screenshotLimit has been exceeded.1179        """1180        freeScreenshots = [filename1181                           for (filename, refCount) in self._screenshotRefCount.iteritems()1182                           if refCount == 0]1183        archiveCount = len(freeScreenshots) - self._screenshotLimit1184        if archiveCount > 0:1185            freeScreenshots.sort(reverse=True) # archive oldest1186            while archiveCount > 0:1187                toBeArchived = freeScreenshots.pop()1188                try:1189                    self._archiveScreenshot(toBeArchived)1190                except IOError:1191                    pass1192                del self._screenshotRefCount[toBeArchived]1193                archiveCount -= 11194    def refreshScreenshot(self, forcedScreenshot=None, rotate=None):1195        """1196        Takes new screenshot and updates the latest screenshot object.1197        Parameters:1198          forcedScreenshot (Screenshot or string, optional):1199                  use given screenshot object or image file, do not1200                  take new screenshot.1201          rotate (integer, optional):1202                  rotate screenshot by given number of degrees. This1203                  overrides constructor rotateScreenshot parameter...server.py
Source:server.py  
1#! /usr/bin/env python32# TEACHER - SERVER #3#4# Copyright (C) 2017 Thomas Michael Weissel5#6# This software may be modified and distributed under the terms7# of the GPLv3 license.  See the LICENSE file for details.8#9# sudo -H pip3 install twisted  # we need twisted for python310import os11import sys12sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))  # add application root to python path for imports13import qt5reactor14import ipaddress15import datetime16import time17import sip18import zipfile19import ntpath20import shutil21from twisted.internet import protocol22from twisted.internet.protocol import DatagramProtocol23from twisted.protocols import basic24from twisted.internet.task import LoopingCall25from config.config import *26from applist import *27from config.enums import *28import classes.mutual_functions as mutual_functions29import classes.system_commander as system_commander30from classes.server2client import *31from PyQt5 import uic, QtWidgets, QtCore32from PyQt5.QtGui import *33from PyQt5.QtCore import QRegExp34class ServerUI(QtWidgets.QDialog):35    def __init__(self, factory):36        QtWidgets.QDialog.__init__(self)37        self.factory = factory     # type: MyServerFactory38   39        uifile=os.path.join(APP_DIRECTORY,'server/server.ui')40        winicon=os.path.join(APP_DIRECTORY,'pixmaps/windowicon.png')41        self.ui = uic.loadUi(uifile)        # load UI42        self.ui.setWindowIcon(QIcon(winicon))# definiere icon für taskleiste43        44        self.ui.exit.clicked.connect(self._onAbbrechen)  # setup Slots45        self.ui.sendfile.clicked.connect(lambda: self._onSendFile("all"))  # button x   (lambda is not needed - only if you wanna pass a variable to the function)46        self.ui.showip.clicked.connect(self._onShowIP)  # button y47        self.ui.abgabe.clicked.connect(lambda: self._onAbgabe("all"))48        self.ui.screenshots.clicked.connect(lambda: self._onScreenshots("all"))49        self.ui.startexam.clicked.connect(lambda: self._on_start_exam("all"))50        self.ui.openshare.clicked.connect(self._onOpenshare)51        self.ui.starthotspot.clicked.connect(self._onStartHotspot)52        self.ui.testfirewall.clicked.connect(self._onTestFirewall)53        54        self.ui.autoabgabe.clicked.connect(self._onAutoabgabe)55        self.ui.screenlock.clicked.connect(lambda: self._onScreenlock("all"))56        self.ui.exitexam.clicked.connect(lambda: self._on_exit_exam("all"))57        self.ui.closeEvent = self.closeEvent  # links the window close event to our custom ui58        self.ui.printconf.clicked.connect(self._onPrintconf)59        self.ui.printer.clicked.connect(lambda: self._onSendPrintconf("all"))60        self.workinganimation = QMovie("pixmaps/working.gif", QtCore.QByteArray(), self)61        self.workinganimation.setCacheMode(QMovie.CacheAll)62        self.workinganimation.setSpeed(100)63        self.ui.working.setMovie(self.workinganimation)64        self.timer = False65        self.msg = False66        self.ui.version.setText("<b>Version</b> %s" % VERSION )67        self.ui.currentpin.setText("<b>%s</b>" % self.factory.pincode  )68        self.ui.examlabeledit1.setText(self.factory.examid  )69        self.ui.currentlabel.setText("<b>%s</b>" % self.factory.examid  )70        self.ui.examlabeledit1.textChanged.connect(self._updateExamName)71        self.ui.ssintervall.valueChanged.connect(self._changeAutoscreenshot)72        num_regex=QRegExp("[0-9_]+")73        num_validator = QRegExpValidator(num_regex)74        ip_regex=QRegExp("[0-9\._]+")75        ip_validator = QRegExpValidator(ip_regex)76        self.ui.firewall1.setValidator(ip_validator)77        self.ui.firewall2.setValidator(ip_validator)78        self.ui.firewall3.setValidator(ip_validator)79        self.ui.firewall4.setValidator(ip_validator)80        self.ui.port1.setValidator(num_validator)81        self.ui.port2.setValidator(num_validator)82        self.ui.port3.setValidator(num_validator)83        self.ui.port4.setValidator(num_validator)84        findApps(self.ui.applist, self.ui.appview)85        86        self.ui.keyPressEvent = self.newOnkeyPressEvent87        88        self.ui.show()89    def _changeAutoscreenshot(self):90        self._workingIndicator(True, 200)91        intervall = self.ui.ssintervall.value()92      93        if self.factory.lcs.running:94            self.factory.lcs.stop()95        if intervall != 0:96            self.log("<b>Changed Screenshot Intervall to %s seconds </b>" % (str(intervall)))97            self.factory.lcs.start(intervall)98        else:99            self.log("<b>Screenshot Intervall is set to 0 - Screenshotupdate deactivated</b>")100    def _onSendPrintconf(self,who):101        """send the printer configuration to all clients"""102        self._workingIndicator(True, 500)103        server_to_client = self.factory.server_to_client104        if self.factory.rawmode == True:   #check if server is already in rawmode (ongoing filetransfer)105            self.log("waiting for ongoing filetransfers to finish ..")106            return107        else:108            if not server_to_client.clients:        #check if there are clients connected109                self.log("no clients connected")110                return111            self.factory.rawmode = True;   #ready for filetransfer - LOCK all other fileoperations 112 113        self._workingIndicator(True, 4000)114        self.log('<b>Sending Printer Configuration to All Clients </b>')115        system_commander.dialog_popup('Sending Printer Configuration to All Clients')116        # create zip file of /etc/cups117        target_folder = PRINTERCONFIG_DIRECTORY118        filename = "PRINTERCONFIG"119        output_filename = os.path.join(SERVERZIP_DIRECTORY, filename)120        shutil.make_archive(output_filename, 'zip', target_folder)121        filename = "%s.zip" % (filename)122        file_path = os.path.join(SERVERZIP_DIRECTORY, filename)  # now with .zip extension123        # regenerate filelist and check for zip file124        self.factory.files = mutual_functions.get_file_list(self.factory.files_path)125        if filename not in self.factory.files:126            self.log('filename not found in directory')127            return128        self.log('Sending Configuration: %s (%d KB)' % (filename, self.factory.files[filename][1] / 1024))129        # send line and file to all clients130        server_to_client.send_file(file_path, who, DataType.PRINTER.value)131    def _onPrintconf(selfs):132        command = "kcmshell5 kcm_printer_manager &"133        os.system(command)134    def _onScreenlock(self,who):135        """locks the client screens"""136        self._workingIndicator(True, 1000)137        138        if self.factory.clientslocked:139            self.log("<b>UNLocking Client Screens </b>")140            os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')141            self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')))142            self.factory.clientslocked = False143            144            if self.factory.rawmode == True:    #dirty hack - thx to nora - gives us at least one option to open filetransfers again if something wicked happens145                self.factory.rawmode = False;146            147            if not self.factory.server_to_client.unlock_screens(who):148                self.log("no clients connected")149        else:150            self.log("<b>Locking Client Screens </b>")151            self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/unlock.png')))152            self.factory.clientslocked = True153            if not self.factory.server_to_client.lock_screens(who):154                self.log("no clients connected")155                self.factory.clientslocked = False156                self.ui.screenlock.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/network-wired-symbolic.png')))157        self._onScreenshots("all")   #update screenshots right after un/lock158    def _onOpenshare(self):159        startcommand = "runuser -u %s /usr/bin/dolphin %s &" %(USER ,SHARE_DIRECTORY)160        os.system(startcommand)161    def _updateExamName(self):162        self.factory.examid = self.ui.examlabeledit1.text()163        self.ui.currentlabel.setText("<b>%s</b>" % self.factory.examid  )164    def _workingIndicator(self, action, duration):165        if self.timer and self.timer.isActive:  # indicator is shown a second time - stop old kill-timer166            self.timer.stop()167        if action is True:  # show working animation and start killtimer168            self.workinganimation.start()169            self.ui.working.show()170            self.timer = QtCore.QTimer()171            self.timer.timeout.connect(lambda: self._workingIndicator(False, 0))172            self.timer.start(duration)173        else:174            self.workinganimation.stop()175            self.ui.working.hide()176    def _onSendFile(self, who):177        """send a file to all clients"""178        self._workingIndicator(True, 500)179        server_to_client = self.factory.server_to_client180        181        if self.factory.rawmode == True:   #check if server is already in rawmode (ongoing filetransfer)182            self.log("waiting for ongoing filetransfers to finish ..")183            return184        else:185            if not server_to_client.clients:        #check if there are clients connected186                self.log("no clients connected")187                return188            self.factory.rawmode = True;   #ready for filetransfer - LOCK all other fileoperations 189        file_path = self._showFilePicker(SHARE_DIRECTORY)190        if file_path:191            self._workingIndicator(True, 2000) # TODO: change working indicator to choose its own time depending on actions requiring all clients or only one client192            success, filename, file_size, who = server_to_client.send_file(file_path, who, DataType.FILE.value)193            if success:194                self.log('<b>Sending file:</b> %s (%d Byte) to <b> %s </b>' % (filename, file_size, who))195            else:196                self.log('<b>Sending file:</b> Something went wrong sending file %s (%d KB) to <b> %s </b>' % (filename, file_size / 1024, who))197        else:198            self.factory.rawmode = False;199    def _showFilePicker(self, directory):200        # show filepicker201        filedialog = QtWidgets.QFileDialog()202        filedialog.setDirectory(directory)  # set default directory203        file_path = filedialog.getOpenFileName()  # get filename204        file_path = file_path[0]205        return file_path206    def _onScreenshots(self, who):207        self.log("<b>Requesting Screenshot Update </b>")208        self._workingIndicator(True, 1000)209        210        if self.factory.rawmode == True:211            self.log("waiting for ongoing filetransfers to finish ..")212            return213        else:214            self.factory.rawmode = True;   #LOCK all other fileoperations 215    216        if not self.factory.server_to_client.request_screenshots(who):217            self.factory.rawmode = False;     # UNLOCK all fileoperations 218            self.log("no clients connected")219    def _onShowIP(self):220        self._workingIndicator(True, 500)221        system_commander.show_ip()222    def _onAbgabe(self, who):223        """get SHARE folder"""224        self._workingIndicator(True, 500)225        self.log('<b>Requesting Client Folder SHARE </b>')226        itime = 2000 if who == 'all' else 1000227        self._workingIndicator(True, itime)228        if self.factory.rawmode == True:229            self.log("waiting for ongoing filetransfers to finish ..")230            return231        else:232            self.factory.rawmode = True;   #LOCK all other fileoperations 233        if not self.factory.server_to_client.request_abgabe(who):234            self.factory.rawmode = False;     # UNLOCK all fileoperations 235            self.log("no clients connected")236    def _on_start_exam(self, who):237        """238        ZIP examconfig folder239        send configuration-zip to clients - unzip there240        invoke startexam.sh file on clients241        """242        self._workingIndicator(True, 500)243        server_to_client = self.factory.server_to_client244        245        if self.factory.rawmode == True:   #check if server is already in rawmode (ongoing filetransfer)246            self.log("waiting for ongoing filetransfers to finish ..")247            return248        else:249            if not server_to_client.clients:        #check if there are clients connected250                self.log("no clients connected")251                return252            self.factory.rawmode = True;   #ready for filetransfer - LOCK all other fileoperations 253    254        self._workingIndicator(True, 4000)255        self.log('<b>Initializing Exam Mode On All Clients </b>')256        cleanup_abgabe = self.ui.cleanabgabe.checkState()257        # create zip file of all examconfigs258        target_folder = EXAMCONFIG_DIRECTORY259        filename = "EXAMCONFIG"260        output_filename = os.path.join(SERVERZIP_DIRECTORY, filename)261        shutil.make_archive(output_filename, 'zip', target_folder)262        filename = "%s.zip" % (filename)263        file_path = os.path.join(SERVERZIP_DIRECTORY, filename)  #now with .zip extension264        #regenerate filelist and check for zip file265        self.factory.files = mutual_functions.get_file_list(self.factory.files_path)266        if filename not in self.factory.files:267            self.log('filename not found in directory')268            return269        self.log('Sending Configuration: %s (%d KB)' % (filename, self.factory.files[filename][1] / 1024))270        # send line and file to all clients271        server_to_client.send_file(file_path, who, DataType.EXAM.value, cleanup_abgabe )272    def _on_exit_exam(self,who):273        self.log("<b>Finishing Exam </b>")274        self._workingIndicator(True, 2000)275        if self.factory.lcs.running:276            self.factory.lcs.stop() #  disable autoscreenshot ??277        if self.factory.lc.running:  #  disable autoabgabe ??278            self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer-off.png')))279            self.factory.lc.stop()280        281        onexit_cleanup_abgabe = self.ui.exitcleanabgabe.checkState()282        283        # first fetch abgabe284        if self.factory.rawmode == True:285            self.log("waiting for ongoing filetransfers to finish ..")286            return  #FIXME this could lead to some clients not exiting in very very rare cases where a BIG filetransfer is still on -- probably wait a second and try again ??287        else:288            self.factory.rawmode = True;   #LOCK all other fileoperations 289        if not self.factory.server_to_client.request_abgabe(who):290            self.factory.rawmode = False;     # UNLOCK all fileoperations 291            self.log("no clients connected")292        # then send the exam exit signal293        if not self.factory.server_to_client.exit_exam(who, onexit_cleanup_abgabe):294            self.log("no clients connected")295    def _onStartHotspot(self):296        self._workingIndicator(True, 500)297        system_commander.start_hotspot()298    def get_firewall_adress_list(self):299        return [[self.ui.firewall1,self.ui.port1],[self.ui.firewall2,self.ui.port2],[self.ui.firewall3,self.ui.port3],[self.ui.firewall4,self.ui.port4]]300    def _onTestFirewall(self):301        self._workingIndicator(True, 1000)302        ipfields = self.get_firewall_adress_list()303        if self.ui.testfirewall.text() == "Stoppe Firewall":    #really don't know why qt sometimes adds these & signs to the ui304            system_commander.dialog_popup('Die Firewall wird gestoppt!')305            scriptfile = os.path.join(SCRIPTS_DIRECTORY, "exam-firewall.sh")306            startcommand = "exec %s stop &" % (scriptfile)307            os.system(startcommand)308            self.ui.testfirewall.setText("Firewall testen")309            for i in ipfields:310                palettedefault = i[0].palette()311                palettedefault.setColor(QPalette.Active, QPalette.Base, QColor(255, 255, 255))312                i[0].setPalette(palettedefault)313        elif self.ui.testfirewall.text() == "Firewall testen":314            ipstore = os.path.join(EXAMCONFIG_DIRECTORY, "EXAM-A-IPS.DB")315            openedexamfile = open(ipstore, 'w+')  # erstelle die datei neu316            number = 0317            for i in ipfields:318                ip = i[0].text()319                port = i[1].text()320                if mutual_functions.checkIP(ip):321                    thisexamfile = open(ipstore, 'a+')  # anhängen322                    number += 1323                    if number != 1:  # zeilenumbruch einfügen ausser vor erster zeile (keine leerzeilen in der datei erlaubt)324                        thisexamfile.write("\n")325                    thisexamfile.write("%s:%s" %(ip,port) )326                else:327                    if ip != "":328                        palettewarn = i[0].palette()329                        palettewarn.setColor(i[0].backgroundRole(), QColor(200, 80, 80))330                        # palettewarn.setColor(QPalette.Active, QPalette.Base, QColor(200, 80, 80))331                        i[0].setPalette(palettewarn)332            system_commander.dialog_popup("Die Firewall wird aktiviert!")333            scriptfile = os.path.join(SCRIPTS_DIRECTORY, "exam-firewall.sh")334            startcommand = "exec %s start &" % (scriptfile)335            os.system(startcommand)336            self.ui.testfirewall.setText("Stoppe Firewall")337    def _onAutoabgabe(self):338        self._workingIndicator(True, 500)339        intervall = self.ui.aintervall.value()340        minute_intervall = intervall * 60  # minuten nicht sekunden341        if self.factory.lc.running:342            self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer-off.png')))343            self.factory.lc.stop()344            self.log("<b>Auto-Submission deactivated </b>")345            return346        if intervall != 0:347            self.ui.autoabgabe.setIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/chronometer.png')))348            self.log("<b>Activated Auto-Submission every %s minutes </b>" % (str(intervall)))349            self.factory.lc.start(minute_intervall)350        else:351            self.log("Auto-Submission Intervall is set to 0 - Auto-Submission not active")352    def _onRemoveClient(self, client_id):353        self._workingIndicator(True, 500)354        client_name = self.factory.server_to_client.kick_client(client_id)355        #if client_name:356        sip.delete(self.get_list_widget_by_client_id(client_id))   #remove client widget no matter if client still is connected or not357            # delete all ocurrances of this screenshotitem (the whole item with the according widget and its labels)358        self.log('Connection to client <b> %s </b> has been <b>removed</b>.' % client_name)359    def _disableClientScreenshot(self, client):360        self._workingIndicator(True, 500)361        client_name = client.clientName362        client_id = client.clientConnectionID363        item = self.get_list_widget_by_client_id(client_id)364        pixmap = QPixmap(os.path.join(APP_DIRECTORY,'pixmaps/nouserscreenshot.png'))365        try:366            item.picture.setPixmap(pixmap)367            item.info.setText('%s \ndisconnected' % client_name)368            item.disabled = True369        except:370            #item not found because first connection attempt371            return372    def log(self, msg):373        timestamp = '[%s]' % datetime.datetime.now().strftime("%H:%M:%S")374        self.ui.logwidget.append(timestamp + " " + str(msg))375    def createOrUpdateListItem(self, client, screenshot_file_path):376        """generates new listitem that displays the clientscreenshot"""377        existing_item = self.get_list_widget_by_client_name(client.clientName)378        if existing_item:  # just update screenshot379            self._updateListItemScreenshot(existing_item, client, screenshot_file_path)380        else:381            self._addNewListItem(client, screenshot_file_path)382    def _addNewListItem(self, client, screenshot_file_path):383        item = QtWidgets.QListWidgetItem()384        item.setSizeHint(QtCore.QSize(140, 100));385        item.id = client.clientName  # store clientName as itemID for later use (delete event)386        item.pID = client.clientConnectionID387        item.disabled = False388        pixmap = QPixmap(screenshot_file_path)389        pixmap = pixmap.scaled(QtCore.QSize(120,67))390        item.picture = QtWidgets.QLabel()391        item.picture.setPixmap(pixmap)392        item.picture.setAlignment(QtCore.Qt.AlignCenter)393        item.info = QtWidgets.QLabel('%s \n%s' % (client.clientName, client.clientConnectionID))394        item.info = QtWidgets.QLabel('%s' % (client.clientName))395        item.info.setAlignment(QtCore.Qt.AlignCenter)396        grid = QtWidgets.QGridLayout()397        grid.setSpacing(1)398        grid.addWidget(item.picture, 1, 0)399        grid.addWidget(item.info, 2, 0)400        widget = QtWidgets.QWidget()401        widget.setLayout(grid)402        widget.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)403        widget.customContextMenuRequested.connect(lambda: self._on_context_menu(item.pID, item.disabled))404        widget.mouseDoubleClickEvent = lambda event: self._onDoubleClick(item.pID, item.id, screenshot_file_path, item.disabled)405        self.ui.listWidget.addItem(item)  # add the listitem to the listwidget406        self.ui.listWidget.setItemWidget(item, widget)  # set the widget as the listitem's widget407    def _updateListItemScreenshot(self, existing_item, client, screenshot_file_path):408        try:409            self.factory.disconnected_list.remove(client.clientName)  # if client reconnected remove from disconnected_list410        except:411            pass            #changed return to pass otherwise the screenshot is not updated412        pixmap = QPixmap(screenshot_file_path)413        pixmap = pixmap.scaled(QtCore.QSize(120, 67))414        existing_item.picture.setPixmap(pixmap)415        existing_item.info.setText('%s' % (client.clientName))416        existing_item.pID = client.clientConnectionID  # in case this is a reconnect - update clientConnectionID in order to address the correct connection417        existing_item.disabled = False418        try:419            if self.screenshotwindow.client_connection_id == existing_item.pID:420                self.screenshotwindow.oImage = QImage(screenshot_file_path)421                self.screenshotwindow.sImage = self.screenshotwindow.oImage.scaled(QtCore.QSize(1200, 675))  # resize Image to widgets size422                self.screenshotwindow.palette = QPalette()423                self.screenshotwindow.palette.setBrush(10, QBrush(self.screenshotwindow.sImage))  # 10 = Windowrole424                self.screenshotwindow.setPalette(self.screenshotwindow.palette)425            426        except:427            pass428    def _onDoubleClick(self, client_connection_id, client_name, screenshot_file_path, client_disabled):429        if client_disabled:430            print("item disabled")431            return432        screenshotfilename = "%s.jpg" % client_connection_id433        self.screenshotwindow = ScreenshotWindow(self, screenshotfilename, client_name, screenshot_file_path, client_connection_id)434        self.screenshotwindow.exec_()435    def _on_context_menu(self, client_connection_id, is_disabled):436        menu = QtWidgets.QMenu()437        action_1 = QtWidgets.QAction("Abgabe holen", menu, triggered=lambda: self._onAbgabe(client_connection_id))438        action_2 = QtWidgets.QAction("Screenshot updaten", menu, triggered=lambda: self._onScreenshots(client_connection_id))439        action_3 = QtWidgets.QAction("Datei senden", menu, triggered=lambda: self._onSendFile(client_connection_id))440        action_4 = QtWidgets.QAction("Exam starten", menu, triggered=lambda: self._on_start_exam(client_connection_id))441        action_5 = QtWidgets.QAction("Exam beenden", menu, triggered=lambda: self._on_exit_exam(client_connection_id))442        action_6 = QtWidgets.QAction("Verbindung trennen", menu,443                                     triggered=lambda: self._onRemoveClient(client_connection_id))444        menu.addActions([action_1, action_2, action_3, action_4, action_5, action_6])445        if is_disabled:446            action_1.setEnabled(False)447            action_2.setEnabled(False)448            action_3.setEnabled(False)449            action_4.setEnabled(False)450            action_5.setEnabled(False)451            action_6.setText("Widget entfernen")452        cursor = QCursor()453        menu.exec_(cursor.pos())454        return455    def get_list_widget_items(self):456        """457        Creates an iterable list of all widget elements aka student screenshots458        :return: list of widget items459        """460        items = []461        for index in range(self.ui.listWidget.count()):462            items.append(self.ui.listWidget.item(index))463        return items464    def get_list_widget_by_client_id(self, client_id):465        for widget in self.get_list_widget_items():466            if client_id == widget.pID:467                print("Found existing list widget for client connectionId %s" % client_id )468                return widget469        return False470    def get_list_widget_by_client_name(self, client_name):471        for widget in self.get_list_widget_items():472            if client_name == widget.id:473                print("Found existing list widget for client name %s" % client_name )474                return widget475        return False476    def get_existing_or_skeleton_list_widget(self, client_name):477        pass478    479            480    def newOnkeyPressEvent(self,e):481        if e.key() == QtCore.Qt.Key_Escape:482            print("close event triggered")483            self._onAbbrechen()484    def closeEvent(self, evnt):485        evnt.ignore()486        print("close event triggered")487        if not self.msg:488            self._onAbbrechen()489    def _onAbbrechen(self):  # Exit button490        self.msg = QtWidgets.QMessageBox()491        self.msg.setIcon(QtWidgets.QMessageBox.Information)492        self.msg.setText("Wollen sie das Programm\nLiFE Exam Server \nbeenden?")493      494        self.msg.setWindowTitle("LiFE Exam")495        self.msg.setStandardButtons(QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No)496        retval = self.msg.exec_()   # 16384 = yes, 65536 = no497       498        if str(retval) == "16384":499            os.remove(SERVER_PIDFILE)500            self.ui.close()501            os._exit(0)  # otherwise only the gui is closed and connections are kept alive502        else:503            self.msg = False504class ScreenshotWindow(QtWidgets.QDialog):505    def __init__(self, serverui, screenshot, clientname, screenshot_file_path, client_connection_id):506        QtWidgets.QDialog.__init__(self)507        self.setWindowIcon(QIcon(os.path.join(APP_DIRECTORY,'pixmaps/windowicon.png')))  # definiere icon für taskleiste508        self.screenshot = screenshot509        self.serverui = serverui510        self.screenshot_file_path = screenshot_file_path511        self.client_connection_id = client_connection_id512        text =  "Screenshot - %s - %s" %(screenshot, clientname)513        self.setWindowTitle(text)514        self.setGeometry(100,100,1200,675)515        self.setFixedSize(1200, 675)516        oImage = QImage(screenshot_file_path)517        sImage = oImage.scaled(QtCore.QSize(1200,675))                   # resize Image to widgets size518        palette = QPalette()519        palette.setBrush(10, QBrush(sImage))                     # 10 = Windowrole520        self.setPalette(palette)521        button1 = QtWidgets.QPushButton('Screenshot archivieren', self)522        button1.move(1020, 580)523        button1.resize(180,40)524        button1.clicked.connect(self._archivescreenshot)525        button2 = QtWidgets.QPushButton('Abgabe holen', self)526        button2.move(1020, 480)527        button2.resize(180,40)528        button2.clicked.connect(lambda: serverui._onAbgabe(client_connection_id))529        button3 = QtWidgets.QPushButton('Screenshot updaten', self)530        button3.move(1020, 530)531        button3.resize(180,40)532        button3.clicked.connect(lambda: serverui._onScreenshots(client_connection_id))533        button4 = QtWidgets.QPushButton('Fenster schlieÃen', self)534        button4.move(1020, 430)535        button4.resize(180,40)536        button4.clicked.connect(self._onClose)537    def _onClose(self):  # Exit button538        self.close()539    def _archivescreenshot(self):540        filedialog = QtWidgets.QFileDialog()541        filedialog.setDirectory(SHARE_DIRECTORY)  # set default directory542        file_path = filedialog.getSaveFileName()  # get filename543        file_path = file_path[0]544        if file_path:545            #os.rename(self.screenshot_file_path, file_path)  #moves the file (its not available in src anymore)546            shutil.copyfile(self.screenshot_file_path,file_path)547            print("screensshot archived")548"""---------"""549"""         """ 550""" TWISTED """551"""         """ 552"""---------"""553class MyServerProtocol(basic.LineReceiver):554    """every new connection builds one MyServerProtocol object"""555    def __init__(self, factory):556        self.factory = factory  # type: MyServerFactory557        self.clientName = ""558        self.file_handler = None559        self.line_data_list = ()560        self.refused = False561        self.clientConnectionID = ""562        self.filetransfer_fail_count = 0563    # twisted564    def connectionMade(self):565        self.factory.server_to_client.add_client(self)566        self.file_handler = None567        self.line_data_list = ()568        self.refused = False569        self.clientConnectionID = str(self.transport.client[1])570        self.transport.setTcpKeepAlive(1)571        self.factory.window.log(572            'Connection from: %s (%d clients total)' % (573            self.transport.getPeer().host, len(self.factory.server_to_client.clients)))574    # twisted575    def connectionLost(self, reason):576        print("ConnectionLost")577        print(reason)  # maybe give it another try if connection closed unclean? ping it ? send custom keepalive? or even a reconnect call?578        self.factory.server_to_client.remove_client(self)579        self.file_handler = None580        self.line_data_list = ()581        self.factory.rawmode = False;  # we deactivate the rawmode ft block here in case the disconnect interrupted an ongoing filetransfer which would never send \r\n and therefore never unblock (worst case scenario: an other ft could be started during an ongoing ft - because notblocked - and lead to a corrupted (and therefore removed) ft)582        583        self.factory.window.log(584            'Connection from %s lost (%d clients left)' % (585            self.transport.getPeer().host, len(self.factory.server_to_client.clients)))586        if not self.refused:587            self.factory.window._disableClientScreenshot(self)588            self.factory.disconnected_list.append(self.clientName)  #keep client name in disconnected_list589        else:590            try:591                self.factory.disconnected_list.remove(self.clientName)   # this one is not coming back592            except:593                return594            595    # twisted596    def rawDataReceived(self, data):597        """ handle incoming byte data """598        filename = self.line_data_list[2]599        file_path = os.path.join(self.factory.files_path, filename)600        # self.factory.window.log('Receiving file chunk (%d KB)' % (len(data)/1024))601        if not self.file_handler:602            self.file_handler = open(file_path, 'wb')603        if data.endswith(b'\r\n'):  # Last chunk604            data = data[:-2]605            self.file_handler.write(data)606            self.file_handler.close()607            self.file_handler = None608            self.setLineMode()609            self.factory.rawmode = False;  #filetransfer finished "UNLOCK" fileopertions610          611            if mutual_functions.validate_file_md5_hash(file_path, self.line_data_list[3]):  # everything ok..  file received612                self.factory.window.log('File %s has been successfully transferred' % (filename))613                self.filetransfer_fail_count = 0614                615                if self.line_data_list[1] == DataType.SCREENSHOT.value:  # screenshot is received on initial connection616                    screenshot_file_path = os.path.join(SERVERSCREENSHOT_DIRECTORY, filename)617                    os.rename(file_path, screenshot_file_path)  # move image to screenshot folder618                    mutual_functions.fixFilePermissions(SERVERSCREENSHOT_DIRECTORY)  # fix filepermission of transferred file619                    self.factory.window.createOrUpdateListItem(self, screenshot_file_path)  # make the clientscreenshot visible in the listWidget620                elif self.line_data_list[1] == DataType.ABGABE.value:621                    extract_dir = os.path.join(SHARE_DIRECTORY, self.clientName, filename[622                                                                                  :-4])  # extract to unzipDIR / clientName / foldername without .zip (cut last four letters #shutil.unpack_archive(file_path, extract_dir, 'tar')   #python3 only but twisted RPC is not ported to python3 yet623                    user_dir = os.path.join(SHARE_DIRECTORY, self.clientName)624                    mutual_functions.checkIfFileExists(user_dir)  ## checks if filename is taken and renames this file in order to make room for the userfolder625                    with zipfile.ZipFile(file_path, "r") as zip_ref:626                        zip_ref.extractall(extract_dir) 627                    os.unlink(file_path)  # delete zip file628                    mutual_functions.fixFilePermissions(SHARE_DIRECTORY)  # fix filepermission of transferred file629            else:  # wrong file hash630                os.unlink(file_path)631                self.transport.write(b'File was successfully transferred but not saved, due to invalid MD5 hash\n')632                self.transport.write(Command.ENDMSG.tobytes() + b'\r\n')633                self.factory.window.log('File %s has been successfully transferred, but deleted due to invalid MD5 hash' % (filename))634                # request flie again if filerequest was ABGABE (we don't care about a missed screenshotupdate)635                if self.line_data_list[1] == DataType.ABGABE.value and self.filetransfer_fail_count <= 1:636                    self.filetransfer_fail_count += 1637                    self.factory.window.log('Failed transfers: %s' %self.filetransfer_fail_count)638                    self.factory.window._onAbgabe(self.clientConnectionID)639                else:640                    self.filetransfer_fail_count = 0641        else:642            self.file_handler.write(data)643    def sendEncodedLine(self,line):644        # twisted645        self.sendLine(line.encode() )646    # twisted647    def lineReceived(self, line):648        """whenever the CLIENT sent something """649        line = line.decode()  # we get bytes but need strings650        self.line_data_list = mutual_functions.clean_and_split_input(line)651        print("\nDEBUG: line received and decoded:\n%s\n" % self.line_data_list)652        self.line_dispatcher()    #pass "self" as "client"653       654       655    def line_dispatcher(self):656        if len(self.line_data_list) == 0 or self.line_data_list == '':657            return658        """659        FILETRANSFER    (ist immer getfile, der client kann derzeit noch keine files anfordern)660        command = self.line_data_list[0]    661        filetype = self.line_data_list[1] 662        filename = self.line_data_list[2] 663        filehash = self.line_data_list[3] 664        (( clientName = self.line_data_list[4] ))665        666        AUTH667        command = self.line_data_list[0]    668        id = self.line_data_list[1] 669        pincode = self.line_data_list[2] 670        671        """672        command = {673            Command.AUTH.value: self._checkclientAuth, 674            Command.FILETRANSFER.value: self._get_file_request,675        }676        line_handler = command.get(self.line_data_list[0], None)677        line_handler()678    def _get_file_request(self):679        """680        Puts server into raw mode to receive files681        """682        self.factory.window.log('Incoming File Transfer from Client <b>%s </b>' % (self.clientName))683        self.setRawMode()  # this is a file - set to raw mode684        685    def _checkclientAuth(self):686        """687        searches for the newID in factory.clients and rejects the connection if found or wrong pincode688        :param newID: string689        :param pincode: int690        :return:691        """692        newID = self.line_data_list[1] 693        pincode = self.line_data_list[2]694        695        if newID in self.factory.server_to_client.clients.keys():696            print("this user already exists and is connected")   #TEST keys contains numbers - newID is a name .. how does this work?697            self.refused = True698            self.sendEncodedLine(Command.REFUSED.value)699            self.transport.loseConnection()700            self.factory.window.log('Client Connection from %s has been refused. User already exists' % (newID))701            return702        elif int(pincode) != self.factory.pincode:703            print("wrong pincode")704            self.refused = True705            self.sendEncodedLine(Command.REFUSED.value)706            self.transport.loseConnection()707            self.factory.window.log('Client Connection from %s has been refused. Wrong pincode given' % (newID ))708            return709        else:  # otherwise ad this unique id to the client protocol instance and request a screenshot710            print("pincode ok")711            self.clientName = newID712            self.factory.window.log('New Connection from <b>%s </b>' % (newID) )713            #transfer, send, screenshot, filename, hash, cleanabgabe714            line = "%s %s %s %s.jpg none none" % (Command.FILETRANSFER.value, Command.SEND.value, DataType.SCREENSHOT.value, self.transport.client[1])715            self.sendEncodedLine(line)716            return717class MyServerFactory(protocol.ServerFactory):718    def __init__(self, files_path, reactor):719        self.files_path = files_path720        self.reactor = reactor721        self.server_to_client = ServerToClient() # type: ServerToClient722        self.disconnected_list = []723        self.files = None724        self.clientslocked = False725        self.rawmode = False;  #this is set to True the moment the server sends examconfig, sends file, sends printconf, requests abgabe, requests screenshot726        self.pincode = mutual_functions.generatePin(4)727        self.examid = "Exam-%s" % mutual_functions.generatePin(3)728        self.window = ServerUI(self)                            # type: ServerUI729        self.lc = LoopingCall(lambda: self.window._onAbgabe("all"))730        self.lcs = LoopingCall(lambda: self.window._onScreenshots("all"))731        732        intervall = self.window.ui.ssintervall.value()733        if intervall != 0:734            self.window.log("<b>Changed Screenshot Intervall to %s seconds </b>" % (str(intervall)))735            self.lcs.start(intervall)736        else:737            self.window.log("<b>Screenshot Intervall is set to 0 - Screenshotupdate deactivated</b>")738        739        740        741        # _onAbgabe kann durch lc.start(intevall) im intervall ausgeführt werden742        #mutual_functions.checkFirewall(self.window.get_firewall_adress_list())  # deactivates all iptable rules if any743        #starting multicast server here in order to provide "factory" information via broadcast744        self.reactor.listenMulticast(8005, MultcastLifeServer(self), listenMultiple=True)745    """746    http://twistedmatrix.com/documents/12.1.0/api/twisted.internet.protocol.Factory.html#buildProtocol747    """748    def buildProtocol(self, addr):749        return MyServerProtocol(self)750        """751        wird bei einer eingehenden client connection aufgerufen - erstellt ein object der klasse MyServerProtocol für jede connection und übergibt self (die factory)752        """753class MultcastLifeServer(DatagramProtocol):754    def __init__(self, factory):755         self.factory = factory756    def startProtocol(self):757        """Called after protocol has started listening. """758        self.transport.setTTL(5)     # Set the TTL>1 so multicast will cross router hops:759        self.transport.joinGroup("228.0.0.5")   # Join a specific multicast group:760    def datagramReceived(self, datagram, address):761       762        datagram = datagram.decode()763        764        if "CLIENT" in datagram:765            # Rather than replying to the group multicast address, we send the766            # reply directly (unicast) to the originating port:767            print("Datagram %s received from %s" % (repr(datagram), repr(address)) )768            serverinfo = self.factory.examid + " " + " ".join(self.factory.disconnected_list)769            message = "SERVER %s" % serverinfo770            self.transport.write(message.encode(), ("228.0.0.5", 8005))771            #self.transport.write("SERVER: Assimilate", address)  #this is NOT WORKINC772if __name__ == '__main__':773    mutual_functions.prepareDirectories()  # cleans everything and copies some scripts774    killscript = os.path.join(SCRIPTS_DIRECTORY, "terminate-exam-process.sh")775    os.system("%s %s" % (killscript, 'server'))  # make sure only one client instance is running per client776    # time.sleep(1)777    mutual_functions.writePidFile()778    app = QtWidgets.QApplication(sys.argv)779    qt5reactor.install()  # imported from file and needed for Qt to function properly in combination with twisted reactor780    from twisted.internet import reactor781    reactor.listenTCP(SERVER_PORT, MyServerFactory(SERVERFILES_DIRECTORY, reactor ))  # start the server on SERVER_PORT782    # moved multicastserver starting sequence to factory783    #reactor.listenMulticast(8005, MultcastLifeServer(), listenMultiple=True)784    print ('Listening on port %d' % (SERVER_PORT))...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
