Best Python code snippet using robotframework
__init__.py
Source:__init__.py  
...276    def read_lazy(self):277        """Lazy read from sshout.  Waits a little, but does not block."""278        return self.sshout.read_lazy()279    280    def read_some(self):281        """Always read at least one block, unless the connection is closed.282        May block."""283        if self.isopen:284            return self.sshout.read_some()285        else:286            return self.sshout.read_very_lazy()    287        288    def read_all(self):289        """Reads until end of file hit.  May block."""290        if self.isopen:291            return self.sshout.read_all()292        else:293            return self.sshout.read_very_lazy()294    295    def read_lw_all(self):296        """Reads until prompt returns. May block."""297        if self.isopen:298            timeout = 0299            banner = self.sshout.read_some()300            print "In read_lw_all Expecting:" + self.__prompt301            print "In read_lw_all Received:" + banner302            while banner.find(self.__prompt) == -1 and timeout < MAX_TIMEOUT:303                time.sleep(1)304                timeout += 1305                banner = banner + self.sshout.read_very_lazy()306            if self.debuglevel:307                print "================================================="308                print "Prompt: " + self.__prompt309                print banner310                print "================================================="311            return banner312        else:313            if self.debuglevel:314                print "Ssh::read_lw_all() reading very lazy when should be all"315                time.sleep(10)316            return self.sshout.read_very_lazy()317        time.sleep(5)318319    def login(self, password):320        """Logs in to the ssh host.  Checks for standard prompts, and calls321        the function passed as promptcb to process them.322        Returns the login banner, or 'None' if login process aborted.323        """324        self.connected = False325        logintext='Last login:'326        text = ''327        trycnt = 0328329        self.open()330        banner = self.read_some()331332        if self.debuglevel:333            print ">> 1st banner read is: %s" % banner334            print "Logging in to:" + self.username335            print "Password:" + password336            print "Hostname:" + self.host337338        while banner.find(logintext) == -1 and banner.find("$") == -1 and banner.find("#") == -1 and banner.find(">") == -1:339            if banner.lower().find("permission denied") > -1 or banner.lower().find("try again") > -1 or \340                banner.lower().find("connection refused") > -1 or banner.lower().find("host key verification failed") > -1:341                self.close()342                return banner343344            if trycnt == 6:345                self.close()346                return banner347348            response, abort = _prompt(banner, "password", password)349            trycnt += 1350351            if abort:352                self.close()353                return banner354355            if response != "":356                if self.debuglevel:357                    print "pySsh sending: " + response358                self.write(response + '\n')359360            banner += self.read_some()361362        if banner.find(logintext) != -1 or banner.find("Have") != -1 or \363            banner.find("$") != -1 or banner.find("#") != -1 or banner.find(">") != -1:364            self.password  = password365            self.connected = True366        else:367            self.connected = False368            return banner369370        self.write("pwd\n")371        output = self.read_some()372        result = output.split("\n")373        if len(result) > 2:374            self.__prompt = result[len(result)-1]375        return banner376    377    def ssologin(self, logintext='Last login:'):378        retVal = True379        """Logs in to the ssh host.  Checks for standard prompts, and calls380        the function passed as promptcb to process them.381        Returns the login banner, or 'None' if login process aborted.382        """383        text = ''384        self.open()385        banner = self.read_some()386        if self.debuglevel:387            print ">> 1st banner read is: %s" % banner388            389        promptcount = 0390        #print "=======",banner, "======="391        #print "logintext",logintext392        #if banner.find(logintext) > -1:393        #    return banner394        abort = 0395        while banner.find(logintext) == -1:396            print banner397            if banner.lower().find("password") > -1:398                retVal = False399                abort  = 1   400                print "Found password prompt on", self.host401            elif banner.lower().find("connection refused"):402                retVal = False403                abort  = 1404                print "Connection refused from host", self.host405            elif (banner.lower().find('rsa key fingerprint') >= 0):406                """Found RSA key fingerprint request"""407                response = "yes"408                self.write(response + '\n')409                banner = self.read_some()410            if abort:411                self.connected = False412                return self.close()413        self.connected = True414        return banner415            416    def multipasslogin(self, password1, password2, password3, logintext='Last login:', prompt_callback=_prompt):417        """Logs in to the ssh host.  Checks for standard prompts, and calls418        the function passed as promptcb to process them.419        Returns the login banner, or 'None' if login process aborted.420        """421        text = ''422        self.open()423        banner = self.read_some()424        if self.debuglevel:425            print ">> 1st banner read is: %s" % banner426        427        password = password1428        promptcount = 0429        while banner.find(logintext) == -1:430            print banner431            if banner.find("password") > -1:432                promptcount = promptcount + 1433            if promptcount == 1:434                password = password2435            elif promptcount == 2:436                password = password3437            response, abort = prompt_callback(banner, password=password)438            if abort:439                return self.close()440            print "pySsh sending: " + response441            self.write(response + '\n')442            banner = self.read_some()443        #end while444        if promptcount > 1:445            return "Fail"446        else:447            return banner448        449    def logout(self):450        """Logs out the session."""451        self.close()452        453    def sendcmd(self, cmd, timeout=0, readtype=READ_SOME):454        """Sends the command 'cmd' over the ssh connection, and returns the455        result.  By default it uses read_some, which may block.456        """457        banner = self.__prompt.strip()458        cmd = cmd.strip() #+ ";cd ~"459        try:460            if cmd[-1] != '\n':461                cmd += '\n'462            463            self.write(cmd)464            if readtype == READ_ALL:465                return banner + self.read_all()466            elif readtype == READ_LAZY:467                return self.read_lazy()468            else:469                if timeout > 0:470                    time.sleep(timeout)471                banner = banner + self.read_some()472                return banner473        except mysshError:474            self.close()475   476   477    def sudocmd(self, command):478        #sys.stdout.write(command + "\n")479        #sys.stdout.write("Password: " + self.password + "\n")480        sudooutput = self.interact("sudo " + command + "; sudo -k", "password", self.password)481        return sudooutput482                483    def interact(self, cmd, prompt, answer, readtype=READ_LW_ALL):484        try:485            cmd = cmd.strip() + "; cd ~"486            if cmd[-1] != '\n':487                cmd += '\n'488            self.write(cmd)489            if readtype == READ_ALL:490                if self.debuglevel:491                    print "Ssh::interact() reading all"492                return self.read_all()493            elif readtype == READ_LAZY:494                if self.debuglevel:495                    print "Ssh::interact() reading lazy"496                return self.read_lazy()497            elif readtype == READ_SOME:498                if self.debuglevel:499                    print "Ssh::interact() reading lw_all"500                banner = self.read_some()501                response, abort = _lwiprompt(banner, prompt, answer)502                if response:503                    if self.debuglevel:504                        print "Prompt:  ", prompt505                        print "Response:", response506                    self.write(response + '\n')507                    banner = banner + self.read_some()508                else:509                    if self.debuglevel:510                        print "No response sent"511                        print "Prompt:  ", prompt512                        print "Response:", response513                return banner514            else:515                if self.debuglevel:516                    print "Ssh::interact() reading lw_all"517                banner = self.read_some()518                response, abort = _lwiprompt(banner, prompt, answer)519                if response:520                    if self.debuglevel:521                        print "Prompt:  ", prompt522                        print "Response:", response523                    self.write(response + '\n')524                    banner = banner + self.read_lw_all()525                    #banner = banner + self.read_some()526                else:527                    if self.debuglevel:528                        print "No response sent"529                        print "Prompt:  ", prompt530                        print "Response:", response531                #time.sleep(10)532                return banner533        except mysshError:534            if self.debuglevel:535                print "Ssh::interact encountered mysshError"536            self.close()537        return banner538539540    def runcmd_readall(self, cmd):541        try:542            cmd = cmd.strip() + "; cd ~"543            if cmd[-1] != '\n':544                cmd += '\n'545            self.write(cmd)546            if self.debuglevel:547                print "Ssh::interact() reading lw_all"548            banner = self.read_lw_all()549            return banner550        except mysshError:551            if self.debuglevel:552                print "Ssh::interact encountered mysshError"553            self.close()554        return banner555556    def setpass(self, cmd, prompt, answer):557        try:558            import commands559            readtype=READ_SOME560            if cmd[-1] != '\n':561                cmd += '\n'562            self.write(cmd)563            if readtype == READ_ALL:564                return self.read_all()565            elif readtype == READ_LAZY:566                return self.read_lazy()567            else:568                banner = self.read_some()569                response, abort = _lwiprompt(banner, prompt, answer)570                if response:571                    #print "Banner:  ", banner572                    #print "Prompt:  ", prompt573                    #print "Response:", response574                    self.write(response + '\n')575                    banner = banner + self.read_some()576                    response, abort = _lwiprompt(banner, prompt, answer)577                    if response:578                        #print "Banner:  ", banner579                        #print "Prompt:  ", prompt580                        #print "Response:", response581                        self.write(response + '\n')582                        banner = banner + self.read_some()583                        #print "Wrote response"584                else:585                    print "No response sent"586                    print "Prompt:  ", prompt587                    print "Response:", response588                #self.sendcmd("sleep 0")589                #commands.getoutput("sleep 1")590                return banner591        except mysshError:592            self.close()593594    def changepass(self, oldpass, newpass):595        try:596            self.sendcmd("sleep 0")597            retVal = False598            cmd = "/opt/likewise/bin/passwd"599            readtype=READ_SOME600            if cmd[-1] != '\n':601                cmd += '\n'602            self.write(cmd)603            if readtype == READ_ALL:604                return self.read_all()605            elif readtype == READ_LAZY:606                return self.read_lazy()607            else:608                banner = self.read_some()609                response, abort = _lwiprompt(banner, "current", oldpass)610                if response:611                    self.write(response + '\n')612                    banner = banner + self.read_some()613                    response, abort = _lwiprompt(banner, "new", newpass)614                    if response:615                        self.write(response + '\n')616                        banner = banner + self.read_some()617                    response, abort = _lwiprompt(banner, "password", newpass)618                    if response:619                        self.write(response + '\n')620                        banner = banner + self.read_some()621                        if banner.lower().find("success") > -1:622                            retVal = True623                else:624                    print "No password changed"625                    #print "Prompt:  ", prompt626                    #print "Response:", response627                #self.write("sleep 3\n")628                return banner629        except mysshError:630            self.close()631            632def test():633    """Test routine for myssh.634    
...api.py
Source:api.py  
...49            # # FUNCTION CODE: 0350            # # Register:0-0x0000 ===== on/off (2b)51            # send_mess = b"\x02\x03\x00\x00\x00\x01\x84\x39"52            # tn.write(send_mess)53            # raw_data['on_off'] = (tn.read_some().hex())54            #55            # # Register:26-0x0058 ===== Mode selection (2b)56            # send_mess = b"\x02\x03\x00\x1A\x00\x01\xA5\xFE"57            # tn.write(send_mess)58            # raw_data['mode_select'] = (tn.read_some().hex())59            #60            # # Register:50-0x0032 ===== Maximum DC voltageï¼PV) (2b)61            # send_mess = b"\x02\x03\x00\x32\x00\x01\x25\xF6"62            # tn.write(send_mess)63            # raw_data['max_DC_volt_PV'] = (tn.read_some().hex())64            #65            # # Register:58-0x003A ===== Maximum Output power (2b)66            # send_mess = b"\x02\x03\x00\x3A\x00\x01\xA4\x34"67            # tn.write(send_mess)68            # raw_data['max_output_P'] = (tn.read_some().hex())69            #70            # # Register:74-0x004A ===== Voltage reference (2b)71            # send_mess = b"\x02\x03\x00\x4A\x00\x01\xA5\xEF"72            # tn.write(send_mess)73            # raw_data['volt_ref'] = (tn.read_some().hex())74            # -----------------------------------------------75            # FUNCTION CODE: 0476            # Register:0-0x0000 ===== PV1 Voltage (2b+-)77            # send_mess = b"\x02\x04\x00\x00\x00\x01\x31\xF9"78            # tn.write(send_mess)79            # raw_data['PV1_volt'] = (tn.read_some().hex())80            #81            # # Register:3-0x0003 ===== PV1 DC current (2b+-)82            # send_mess = b"\x02\x04\x00\x03\x00\x01\xC1\xF9"83            # tn.write(send_mess)84            # raw_data['PV1_DC_cur'] = (tn.read_some().hex())85            #86            # # Register:4-0x0004 ===== Output voltage UV (2b+-)87            # send_mess = b"\x02\x04\x00\x04\x00\x01\x70\x38"88            # tn.write(send_mess)89            # raw_data['output_volt_UV'] = (tn.read_some().hex())90            #91            # # Register:5-0x0005 ===== Output voltage VW (2b+-)92            # send_mess = b"\x02\x04\x00\x05\x00\x01\x21\xF8"93            # tn.write(send_mess)94            # raw_data['output_volt_VW'] = (tn.read_some().hex())95            #96            # # Register:6-0x0006 ===== Output voltage WU (2b+-)97            # send_mess = b"\x02\x04\x00\x06\x00\x01\xD1\xF8"98            # tn.write(send_mess)99            # raw_data['output_volt_WU'] = (tn.read_some().hex())100            #101            # # Register:16-0x0010 ===== Output frequency (2b+-)102            # send_mess = b"\x02\x04\x00\x10\x00\x01\x30\x3C"103            # tn.write(send_mess)104            # raw_data['output_freq'] = (tn.read_some().hex())105            # Register:17-0x0011 ===== Battery power (2b+-)106            send_mess = b"\x02\x04\x00\x11\x00\x01\x61\xFC"107            tn.write(send_mess)108            raw_data['batt_P'] = (tn.read_some().hex())109            # Register:21-0x0015 ===== Power grid frequency (2b+-)110            # send_mess = b"\x02\x04\x00\x15\x00\x01\x20\x3D"111            # tn.write(send_mess)112            # raw_data['P_grid_freq'] = (tn.read_some().hex())113            #114            # # Register:22-0x0016 ===== Power factor symbol (screen none) (2b+-)115            # send_mess = b"\x02\x04\x00\x16\x00\x01\xD0\x3D"116            # tn.write(send_mess)117            # raw_data['PF_symbol'] = (tn.read_some().hex())118            #119            # # Register:23-0x0017 ===== Power factor (2b+-)120            # send_mess = b"\x02\x04\x00\x17\x00\x01\x81\xFD"121            # tn.write(send_mess)122            # raw_data['PF'] = (tn.read_some().hex())123            #124            # # Register:47-0x002F ===== Battery percentage (2b+)125            send_mess = b"\x02\x04\x00\x2F\x00\x01\x00\x30"126            tn.write(send_mess)127            raw_data['batt_percen'] = (tn.read_some().hex())128            #129            # # Register:49-0x0031 ===== Load active power (2b+)130            send_mess = b"\x02\x04\x00\x31\x00\x01\x60\x36"131            tn.write(send_mess)132            raw_data['load_act_P'] = (tn.read_some().hex())133            #134            # # Register:51-0x0033 ===== PV1 power (2b+-)135            # send_mess = b"\x02\x04\x00\x33\x00\x01\xC1\xF6"136            # tn.write(send_mess)137            # raw_data['PV1_P'] = (tn.read_some().hex())138            #139            # # Register:53-0x0035 ===== Load current A (2b+-)140            # send_mess = b"\x02\x04\x00\x35\x00\x01\x21\xF7"141            # tn.write(send_mess)142            # raw_data['load_cur_A'] = (tn.read_some().hex())143            #144            # # Register:54-0x0036 ===== Load current B (2b+-)145            # send_mess = b"\x02\x04\x00\x36\x00\x01\xD1\xF7"146            # tn.write(send_mess)147            # raw_data['load_cur_B'] = (tn.read_some().hex())148            #149            # # Register:55-0x0037 ===== Load current C (2b+-)150            # send_mess = b"\x02\x04\x00\x37\x00\x01\x80\x37"151            # tn.write(send_mess)152            # raw_data['load_cur_C'] = (tn.read_some().hex())153            #154            # # Register:62-0x003E ===== PV daily power generation (2b+)155            # send_mess = b"\x02\x04\x00\x3E\x00\x01\x50\x35"156            # tn.write(send_mess)157            # raw_data['PV_daily_P_gen'] = (tn.read_some().hex())158            #159            # # Register:64-0x0040 ===== PV total power generation High (2b+)160            # # Register:65-0x0041 ===== PV total power generation Low (2b+)161            # send_mess = b"\x02\x04\x00\x40\x00\x02\x70\x2C"162            # tn.write(send_mess)163            # raw_data['PV_total_P_gen'] = (tn.read_some().hex())164            #165            # # Register:80-0x0050 ===== Output reactive power (2b+-)166            # send_mess = b"\x02\x04\x00\x50\x00\x01\x31\xE8"167            # tn.write(send_mess)168            # raw_data['output_react_P'] = (tn.read_some().hex())169            #170            # # Register:82-0x0052 ===== Daily load consumption (2b+)171            # send_mess = b"\x02\x04\x00\x52\x00\x01\x90\x28"172            # tn.write(send_mess)173            # raw_data['daily_load_con'] = (tn.read_some().hex())174            #175            # # Register:88-0x0058 ===== Daily power intake from grid (2b+)176            # send_mess = b"\x02\x04\x00\x58\x00\x01\xB0\x2A"177            # tn.write(send_mess)178            # raw_data['daily_P_intake_grid'] = (tn.read_some().hex())179            #180            # # Register:90-0x005A ===== Total power intake from grid High (2b+)181            # # Register:91-0x005B ===== Total power intake from grid Low (2b+)182            # send_mess = b"\x02\x04\x00\x5A\x00\x02\x51\xEB"183            # tn.write(send_mess)184            # raw_data['total_P_intake_grid'] = (tn.read_some().hex())185            #186            # # Register:94-0x005E ===== Daily power fed to grid187            # send_mess = b"\x02\x04\x00\x5E\x00\x01\x50\x2B"188            # tn.write(send_mess)189            # raw_data['daily_P_fed_grid'] = (tn.read_some().hex())190            #191            # # Register:96-0x0060 ===== Total power fed to grid High (2b+)192            # # Register:97-0x0061 ===== Total power fed to grid Low (2b+)193            # send_mess = b"\x02\x04\x00\x60\x00\x02\x71\xE6"194            # tn.write(send_mess)195            # raw_data['total_P_fed_grid'] = (tn.read_some().hex())196            # Register:108-0x006C ===== PV total power (2b+-)197            send_mess = b"\x02\x04\x00\x6C\x00\x01\xF1\xE4"198            tn.write(send_mess)199            raw_data['PV_total_P'] = (tn.read_some().hex())200            # Register:113-0x0071 ===== Output power (2b+-)201            send_mess = b"\x02\x04\x00\x71\x00\x01\x61\xE2"202            tn.write(send_mess)203            raw_data['output_P'] = (tn.read_some().hex())204            #205            # # Register:166-0x00A6 ===== SOH (2b+)206            send_mess = b"\x02\x04\x00\xA6\x00\x01\xD1\xDA"207            tn.write(send_mess)208            raw_data['SOH'] = (tn.read_some().hex())209            #210            # # Register:176-0x00B0 ===== BMS battery status (1b)211            # send_mess = b"\x02\x04\x00\xB0\x00\x01\x30\x1E"212            # tn.write(send_mess)213            # raw_data['BMS_batt_status'] = (tn.read_some().hex())214            for k,v in raw_data.items():215                raw_data[k] = ('0'*((int(v[4:6])*2) - len(v[6:-4]))) + v[6:-4]216            # closed connection217            tn.close()218            self.getDeviceStatusJson(raw_data)219            self.printDeviceStatus()220            if getDeviceStatusResult==True:221                self.set_variable('offline_count', 0)222            else:223                self.set_variable('offline_count', self.get_variable('offline_count')+1)224        except Exception as er:225            print (er)226            # self.set_variable('batt_P', (0))227            # self.set_variable('batt_percen', (0))...install.py
Source:install.py  
...74    smoothie.write(command)75    LOG[command] = "OK"76    if LOG_PRINT:77        print(LOG)78    return command + ' ' +str(smoothie.read_some())79def testes(smoothie, es):80    if es == 'x':81      index = 682    elif es == 'y':83      index = 1484    elif es =='z':85      index = 2286    while True:87      smoothie.write(ES)88      s = smoothie.read_some()89      if s=='':90        s = smoothie.read_some()91      elif s[0]=="o":92        s = smoothie.read_some()93      if s[index] == "1":94        while True:95          smoothie.write(ES)96          s = smoothie.read_some()97          if s=='':98            s = smoothie.read_some()99          elif s[0]=="o":100            s = smoothie.read_some()101          if s[index] == "0":102            return "End Stop "+es +" OK"103          time.sleep(0.1)104# --------------------------------------------------------105# -------------------- route function --------------------106# --------------------------------------------------------107@app.route('/', methods=['POST', 'GET'])108def init():109    if request.method == 'POST':110        LOG['PIC']= request.form['Tech']111        global PIC112        PIC=request.form['Tech']113        SERIAL_NUMBER = request.form['SN']114        changeConfigValue("ROBOT_SN", SERIAL_NUMBER)115        changeConfigValue("UI_LANGUAGE", request.form['language'])116        changeConfigValue("NTRIP_USER", request.form['ntripuser'])117        changeConfigValue("NTRIP_PASSWORD", request.form['ntrippswd'])118        changeConfigValue("NTRIP_CASTER", request.form['ntripcaster'])119        configVPN(SERIAL_NUMBER)120        return redirect(url_for('smoothie_iframe'))121    else:122        return render_template('init.html')123@app.route('/smoothie_iframe', methods=['POST', 'GET'])124def smoothie_iframe():125    if request.method == 'POST':126        return redirect(url_for('cam'))127    else:128        return render_template('smoothie_iframe.html')129@app.route('/cam', methods=['POST', 'GET'])130def cam():131    global PIC132    if PIC is None:133        return redirect("/")134    global camSP135    if request.method == 'GET':136        os.system("sudo systemctl restart nvargus-daemon")137        camSP=startLiveCam()138        return render_template('cam.html')139    else:140        LOG['CAMERA']= 'OK'141        if LOG_PRINT:142            print(LOG)143        os.killpg(os.getpgid(camSP.pid), signal.SIGINT)144        camSP.wait()145        return redirect(url_for('xyd'))146@app.route('/xyd', methods=['POST', 'GET'])147def xyd():148    global PIC149    if PIC is None:150        return redirect("/")151    with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:152      smoothie.write("G91")153      smoothie.read_some()154      smoothie.write("G91")155      LOG['G91'] = 'OK'156      if LOG_PRINT:157        print(LOG)158      if request.method == 'GET':159          return render_template('xyd.html', answer=str(utility.get_smoothie_vesc_addresses()["smoothie"]))160      else:161          if request.form['x'] == 'next':162            return redirect(url_for('vesc_pr'))163@app.route('/vesc_pr', methods=['POST', 'GET'])164def vesc_pr():165    global PIC166    if PIC is None:167        return redirect("/")168    if request.method == 'GET':169        return render_template('vesc_pr.html')170    else:171        if request.form['x'] == 'test_motor':172              with adapters.VescAdapter(config.VESC_RPM_SLOW, config.VESC_MOVING_TIME, config.VESC_ALIVE_FREQ,173                              config.VESC_CHECK_FREQ, config.VESC_PORT, config.VESC_BAUDRATE) as vesc_engine:174                vesc_engine.set_rpm(RPM)175                vesc_engine.set_moving_time(MOVING_TIME)176                vesc_engine.start_moving()177                vesc_engine.wait_for_stop()178              return render_template('vesc_pr.html', answer='test_motor command sent')179        elif request.form['x'] == 'next':180            return redirect(url_for('vesc_z'))181@app.route('/vesc_z', methods=['POST', 'GET'])182def vesc_z():183    global PIC184    if PIC is None:185        return redirect("/")186    with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:187        smoothie.write("G91")188        smoothie.read_some()189        smoothie.write("G91")190        smoothie.read_some()191        if request.method == 'GET':192            return render_template('vesc_z.html')193        else:194            if request.form['x'] == 'test_motor':195                return test_smoothie(smoothie, TEST_Z, VESC_Z_URL)        196                return render_template('vesc_pr.html', answer='test_motor command sent')197            elif request.form['x'] == 'next':198                return redirect(url_for('gps'))199@app.route('/gps', methods=['POST', 'GET'])200def gps():201    global PIC202    if PIC is None:203        return redirect("/")204    if request.method == 'GET':205        return render_template('gps.html')206    else:207        if request.form['x'] == 'test_gps':208          with adapters.GPSUbloxAdapter(config.GPS_PORT, config.GPS_BAUDRATE, config.GPS_POSITIONS_TO_KEEP) as gps:209            return render_template('gps.html', answer = gps.get_fresh_position())210        return redirect(url_for('final'))211@app.route('/final', methods=['POST', 'GET'])212def final():213    global PIC214    if PIC is None:215        return redirect("/")216    if request.method == 'GET':217        if LOG_PRINT:218            print(LOG)219        return render_template('final.html', answer = LOG)220    else:221        return redirect(url_for('final'))222# -----------------------------------------------------------223# -------------------- socketio function --------------------224# -----------------------------------------------------------225@socketio.on('data', namespace='/server')226def on_socket_data(data):227    print(data)228    with connectors.SmoothieV11SerialConnector(utility.get_smoothie_vesc_addresses()["smoothie"], config.SMOOTHIE_BAUDRATE) as smoothie:229        smoothie.write("G91")230        smoothie.read_some()231        smoothie.write("G91")232        LOG['G91'] = 'OK'233        if LOG_PRINT:234            print(LOG)235        if data == "x+":236            to_emit = test_smoothie(smoothie, XP, XYD_URL)237        elif data == "x-": 238            to_emit = test_smoothie(smoothie, XN, XYD_URL),239        elif data == "y+": 240            to_emit = test_smoothie(smoothie, YP, XYD_URL),241        elif data == "y-": 242            to_emit = test_smoothie(smoothie, YN, XYD_URL),243        elif data == "d+": 244            to_emit = test_smoothie(smoothie, DP, XYD_URL),...sequential.py
Source:sequential.py  
...54        55    def testWelcomeMessage(self):56        """On connecting the server sends a 220 response with a welcome message."""57        client = Telnet('localhost', 1025)58        self.assertEqual(client.read_some(),59                         '220 test node.js smtpevent server 0.0.2\r\n'60                         )61        client.close()62        63    def testUnknownCommand(self):64        """Unknown commands are ignored and the client informed."""65        66        client = Telnet('localhost', 1025)67        client.read_some()68        client.write('EHLO')69        self.assertEqual(client.read_some(), 70                         '502 Error: command "EHLO" not implemented\r\n')71        client.close()72        73    def testIllegalHelo(self):74        """HELO takes a single argument."""75        76        client = Telnet('localhost', 1025)77        client.read_some()78        client.write('HELO')79        self.assertEqual(client.read_some(), '501 Syntax: HELO hostname\r\n')80        client.close()81    82    def testLegalHelo(self):83        """The server responds to a valid HELO command."""84        85        client = Telnet('localhost', 1025)86        client.read_some()87        client.write('HELO localhost')88        self.assertEqual(client.read_some(), '250 test Hello 127.0.0.1\r\n')89        client.close()90        91    def testMultipleHelo(self):92        """Only a single HELO command is allowed per connection."""93        94        client = Telnet('localhost', 1025)95        client.read_some()96        client.write('HELO localhost')97        self.assertEqual(client.read_some(), '250 test Hello 127.0.0.1\r\n')98        client.write('HELO localhost')99        self.assertEqual(client.read_some(), '503 Duplicate HELO/EHLO\r\n')100        client.close()101    102    def testIllegalNoop(self):103        """The NOOP command fails if any argument is passed."""104        105        client = Telnet('localhost', 1025)106        client.read_some()107        client.write('NOOP something else here')108        self.assertEqual(client.read_some(), '501 Syntax: NOOP\r\n')109        client.close()110    111    def testLegalNoop(self):112        """The NOOP command takes no arguments."""113        114        client = Telnet('localhost', 1025)115        client.read_some()116        client.write('NOOP')117        self.assertEqual(client.read_some(), '250 Ok\r\n')118        client.close()119        120    def testQuit(self):121        """The QUIT command doesn't care about arguments - the connection is122        closed regardless."""123        124        client = Telnet('localhost', 1025)125        client.read_some()126        client.write('QUIT')127        self.assertEqual(client.read_some(), '221 test closing connection\r\n')128        129        client = Telnet('localhost', 1025)130        client.read_some()131        client.write('QUIT See you later')132        self.assertEqual(client.read_some(), '221 test closing connection\r\n')133        client.close()134        135    def testIllegalRset(self):136        """The RSET command fails if any argument is passed."""137        138        client = Telnet('localhost', 1025)139        client.read_some()140        client.write('RSET now')141        self.assertEqual(client.read_some(), '501 Syntax: RSET\r\n')142        client.close()143        144    def testLegalRset(self):145        """The RSET command takes no arguments."""146        147        client = Telnet('localhost', 1025)148        client.read_some()149        client.write('RSET')150        self.assertEqual(client.read_some(), '250 Ok\r\n')151        client.close()152        153    def testMailNoFrom(self):154        """The MAIL command requires FROM: to follow it."""155        156        client = Telnet('localhost', 1025)157        client.read_some()158        client.write('MAIL')159        self.assertEqual(client.read_some(),160                         '501 Syntax: MAIL FROM:<address>\r\n')161        client.close()162    163    def testMailInvalidFrom(self):164        """The MAIL command requires FROM: to contain an email address."""165        166        client = Telnet('localhost', 1025)167        client.read_some()168        client.write('MAIL FROM:')169        self.assertEqual(client.read_some(),170                         '501 Syntax: MAIL FROM:<address>\r\n')171        client.close()172    173    def testMailFromParse(self):174        """The MAIL command will extract the email address from the FROM:."""175    176        client = Telnet('localhost', 1025)177        client.read_some()178        client.write('MAIL FROM:<person@example.com>')179        self.assertEqual(client.read_some(), '250 Ok\r\n')180        client.close()181        182    def testMailFromParse(self):183        """The MAIL command handles empty addresses"""184    185        client = Telnet('localhost', 1025)186        client.read_some()187        client.write('MAIL FROM:<>')188        self.assertEqual(client.read_some(), '250 Ok\r\n')189        client.close()190    191    def testDuplicateMailCommand(self):192        """Nested MAIL commands are not allowed."""193        194        client = Telnet('localhost', 1025)195        client.read_some()196        client.write('MAIL FROM:<me@example.com>')197        self.assertEqual(client.read_some(), '250 Ok\r\n')198        client.write('MAIL FROM:<me@example.com>')199        self.assertEqual(client.read_some(), '503 Error: nested MAIL command\r\n')200        client.close()201        202    def testRcptWithoutMail(self):203        """The RCPT command must be preceded by the MAIL command."""204        205        client = Telnet('localhost', 1025)206        client.read_some()207        client.write('RCPT TO:<me@example.com>')208        self.assertEqual(client.read_some(), '503 Error: need MAIL command\r\n')209        client.close()210        211    def testRcptWithoutTo(self):212        """The RCPT command must contain TO:<address> as the argument."""213        214        client = Telnet('localhost', 1025)215        client.read_some()216        client.write('MAIL FROM:<you@example.com>')217        self.assertEqual(client.read_some(), '250 Ok\r\n')218        client.write('RCPT')219        self.assertEqual(client.read_some(), '501 Syntax: RCPT TO: <address>\r\n')220        client.close()221    222    def testRcptEmptyTo(self):223        """The RCPT command cannot have an empty TO:."""224        225        client = Telnet('localhost', 1025)226        client.read_some()227        client.write('MAIL FROM:<you@example.com>')228        self.assertEqual(client.read_some(), '250 Ok\r\n')229        client.write('RCPT TO:')230        self.assertEqual(client.read_some(), '501 Syntax: RCPT TO: <address>\r\n')231        client.close()232        233    def testMultipleRcpts(self):234        """Multiple RCPT commands can be issued to add recipients."""235        236        client = Telnet('localhost', 1025)237        client.read_some()238        client.write('MAIL FROM:<you@example.com>')239        self.assertEqual(client.read_some(), '250 Ok\r\n')240        241        for rcpt in self.addrs:242            client.write('RCPT TO:<%s>' % rcpt)243            self.assertEqual(client.read_some(), '250 Ok\r\n')244        client.close()245    246    def testDataWithoutRcpt(self):247        """The DATA command must be preceded by the RCPT TO: command."""248        249        client = Telnet('localhost', 1025)250        client.read_some()251        client.write('DATA')252        self.assertEqual(client.read_some(), '503 Error: need RCPT command\r\n')253        client.close()254    255    def testDataResponse(self):256        """The DATA instructs the client to end the message with <CR><LF>.<CR><LF>."""257        258        client = Telnet('localhost', 1025)259        client.read_some()260        client.write('MAIL FROM:<you@example.com>')261        self.assertEqual(client.read_some(), '250 Ok\r\n')262        client.write('RCPT TO:<me@example.com>')263        self.assertEqual(client.read_some(), '250 Ok\r\n')264        client.write('DATA')265        self.assertEqual(client.read_some(),266                         '354 End data with <CR><LF>.<CR><LF>\r\n')267        client.close()268    269    def testDataArgument(self):270        """The DATA command does not take any arguments."""271        272        client = Telnet('localhost', 1025)273        client.read_some()274        client.write('MAIL FROM:<you@example.com>')275        self.assertEqual(client.read_some(), '250 Ok\r\n')276        client.write('RCPT TO:<me@example.com>')277        self.assertEqual(client.read_some(), '250 Ok\r\n')278        client.write('DATA some data here')279        self.assertEqual(client.read_some(), '501 Syntax: DATA\r\n')280        client.close()281        282if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
