Best Python code snippet using autotest_python
parsers.py
Source:parsers.py  
...27        self.status = {'timestamp': None,28                       'tijdReferentie': None}29        for key in logged_types:30            self.status[key] = {}31    def _parse_status(self, message, data_size):32        """33        Parse the status part of a message.34        Parameters35        ----------36        message : str37            V-log message.38        data_size : float39            Size of one data item, in hex.40        Returns41        ----------42        num_sensors : int43            Number of sensors in status.44        """45        assert int(message[:2], 16) % 2 == 1, "Not status message"46        self.status['deltaTijd'] = int(message[2:5], 16)/10 # Log in seconds47        self._update_time()48        num_sensors = int(hex_string_to_bits(message[5:8])[2:], 2)49        assert len(message[8:]) >= data_size * num_sensors, "Num sensors exceeds message length"50        return num_sensors51    def _parse_update(self, message, data_size):52        """53        Parse the update part of a message.54        Parameters55        ----------56        message : str57            V-log message.58        data_size : float59            Size of one data item, in hex.60        Returns61        ----------62        num_sensors : int63            Number of sensors in status.64        """65        assert int(message[:2], 16) % 2 == 0, "Not update message"66        self.status['deltaTijd'] = int(message[2:5], 16)/10 # Log in seconds67        self._update_time()68        num_sensors = int(message[5], 16)69        assert len(message[6:]) >= data_size * num_sensors, "Num sensors exceeds message length"70        return num_sensors71    def parse_message(self, message):72        """73        Parse a v-log message and update the status.74        If the status is complete log_status is called.75        Parameters76        ----------77        message : str78            V-log message.79        """80        message_type = int(message[:2], 16)81        # Check if message is to be logged82        if message_type not in self.logged_types:83            return84        if message_type == 1:85            # Time reference86            # Sometimes the time is given as 24:00 not 00:00 so add the time to the date to deal with this87            self.status['tijdReferentie'] = (88                    datetime(89                        int(message[2:6]),90                        int(message[6:8]),91                        int(message[8:10])92                    )93                    + timedelta(94                        hours=int(message[10:12]),95                        minutes=int(message[12:14]),96                        seconds=int(message[14:16]),97                        milliseconds=int(message[16]) * 10098                    )99            ).timestamp()100            self.status['deltaTijd'] = 0101        elif message_type == 4:102            # V-Log information103            self.status['vlogInformatie']['V-Log versie'] = "{}.{}.{}".format(int(message[2:4], 16),104                                                                              int(message[4:6], 16),105                                                                              int(message[6:8], 16))106            vri_id = ''107            i = 8108            while i < len(message):109                vri_id += chr(int(message[i:i + 2], 16))110                i += 2111            self.status['vlogInformatie']['VRI id'] = vri_id.strip()  # Remove whitespace112        elif message_type == 5:113            # Detection status114            num_sensors = self._parse_status(message, data_size=1)115            for i in range(0, num_sensors):116                self.status['detectie'][i] = parse_detection_data(message[8 + i])117        elif message_type == 6:118            # Detection update119            num_sensors = self._parse_update(message, data_size=4)120            for i in range(0, num_sensors):121                index = int(message[6 + i * 4:8 + i * 4], 16)122                if index in self.status['detectie'].keys():123                    self.status['detectie'][index] = parse_detection_data(message[9 + i * 4])124        elif message_type == 7:125            # Other input status126            num_sensors = self._parse_status(message, data_size=0.25)127            status_bits = hex_string_to_bits(message[8:])128            for i in range(0, num_sensors):129                self.status['overigeIngangen'][i] = int(status_bits[i], 2)130        elif message_type == 8:131            # Other input update132            num_sensors = self._parse_update(message, data_size=2)133            for i in range(0, num_sensors):134                status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])135                index = int(status_bits[:-1], 2)136                if index in self.status['overigeIngangen'].keys():137                    self.status['overigeIngangen'][index] = int(status_bits[-1], 2)138        elif message_type == 9:139            # Internal phase status140            num_sensors = self._parse_status(message, data_size=3)141            for i in range(0, num_sensors):142                self.status['interneFaseCyclus'][i] = parse_internal_data(message[8 + i * 3:11 + i * 3])143        elif message_type == 10:144            # Internal phase update145            num_sensors = self._parse_update(message, data_size=6)146            for i in range(0, num_sensors):147                index = int(message[6 + i * 6:8 + i * 6], 16)148                if index in self.status['interneFaseCyclus'].keys():149                    self.status['interneFaseCyclus'][index] = parse_internal_data(message[9 + i * 6:12 + i * 6])150        elif message_type == 11:151            # Other output status (GUS)152            num_sensors = self._parse_status(message, data_size=0.25)153            status_bits = hex_string_to_bits(message[8:])154            for i in range(0, num_sensors):155                self.status['overigeUitgangenGUS'][i] = int(status_bits[i], 2)156        elif message_type == 12:157            # Other output update (GUS)158            num_sensors = self._parse_update(message, data_size=2)159            for i in range(0, num_sensors):160                status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])161                index = int(status_bits[:-1], 2)162                if index in self.status['overigeUitgangenGUS'].keys():163                    self.status['overigeUitgangenGUS'][index] = int(status_bits[-1], 2)164        elif message_type == 13:165            # External phase status166            num_sensors = self._parse_status(message, data_size=1)167            for i in range(0, num_sensors):168                self.status['externeSignaalgroep'][i] = int(message[8 + i], 16)169        elif message_type == 14:170            # External phase update171            num_sensors = self._parse_update(message, data_size=4)172            for i in range(0, num_sensors):173                index = int(message[6 + i * 4:8 + i * 4], 16)174                if index in self.status['externeSignaalgroep'].keys():175                    self.status['externeSignaalgroep'][index] = int(message[8 + i * 4:10 + i * 4], 16)176        elif message_type == 15:177            # Other output status (WUS)178            num_sensors = self._parse_status(message, data_size=0.25)179            status_bits = hex_string_to_bits(message[8:])180            for i in range(0, num_sensors):181                self.status['overigeUitgangenWUS'][i] = int(status_bits[i], 2)182        elif message_type == 16:183            # Other output update (WUS)184            num_sensors = self._parse_update(message, data_size=2)185            for i in range(0, num_sensors):186                status_bits = hex_string_to_bits(message[6 + i * 2:8 + i * 2])187                index = int(status_bits[:-1], 2)188                if index in self.status['overigeUitgangenWUS'].keys():189                    self.status['overigeUitgangenWUS'][index] = int(status_bits[-1], 2)190        elif message_type == 17:191            # Desired program status192            num_sensors = self._parse_status(message, data_size=1)193            for i in range(0, num_sensors):194                self.status['gewensteProgrammaStatus'][i] = int(message[8 + i], 16)195        elif message_type == 18:196            # Desired program update197            num_sensors = self._parse_update(message, data_size=2)198            for i in range(0, num_sensors):199                index = int(message[6 + i * 2], 16)200                if index in self.status['gewensteProgrammaStatus'].keys():201                    self.status['gewensteProgrammaStatus'][index] = int(message[7 + i * 2], 16)202        elif message_type == 19:203            # Actual program status204            num_sensors = self._parse_status(message, data_size=1)205            for i in range(0, num_sensors):206                self.status['werkelijkeProgrammaStatus'][i] = int(message[8 + i], 16)207        elif message_type == 20:208            # Actual program update209            num_sensors = self._parse_update(message, data_size=2)210            for i in range(0, num_sensors):211                index = int(message[6 + i * 2], 16)212                if index in self.status['werkelijkeProgrammaStatus'].keys():213                    self.status['werkelijkeProgrammaStatus'][index] = int(message[7 + i * 2], 16)214        elif message_type == 23:215            # Thermometer status216            num_sensors = self._parse_status(message, data_size=1)217            for i in range(0, num_sensors):218                status_bits = hex_string_to_bits(message[8 + i])219                self.status['thermometer'][i] = {'MVG': int(status_bits[-1], 2),220                                                 'RNA': int(status_bits[-2], 2)}221        elif message_type == 24:222            # Thermometer update223            num_sensors = self._parse_update(message, data_size=2)224            for i in range(0, num_sensors):225                status_bits = hex_string_to_bits(message[7 + i * 2])226                index = int(message[6 + i * 2], 16)227                if index in self.status['thermometer'].keys():228                    self.status['thermometer'][index] = {'MVG': int(status_bits[-1], 2),229                                                         'RNA': int(status_bits[-2], 2)}230        elif message_type == 32:...test_parsing.py
Source:test_parsing.py  
...14                "\n"15                "# userid name uniqueid connected ping loss state rate adr\n"16                "#end\n")17        server_dict = {}18        self.exporter._parse_status(data, server_dict)19        self.assertEqual(server_dict["hostname"], "LinuxGSM")20        self.assertEqual(server_dict["bots"], "0")21        self.assertEqual(server_dict["players"], "0")22        self.assertEqual(server_dict["max_players"], "16")23    def test_csgo_stats(self):24        data = ("  CPU   NetIn   NetOut    Uptime  Maps   FPS   Players  Svms    +-ms   ~tick\n"25                "  10.0      0.0      0.0   46732     1   64.07       0    5.31    0.08    0.03\n")26        server_dict = dict()27        self.exporter._parse_stats(data, server_dict)28        for key, value in {"CPU": 10, "NetIn": 0, "NetOut": 0, "Uptime": 46732,29                           "Maps": 1, "FPS": 64.07, "Players": 0, "svarms": 5.31,30                           "varms": 0.08, "vartick": 0.03}.items():31            self.assertEqual(server_dict[key], value)32    def test_fof_status(self):33        data = ("hostname: LinuxGSM\n"34                "version : 468/24 0 secure\n"35                "udp/ip  : 0.0.0.0:27015  (public ip: 141.70.124.33)\n"36                "steamid : [A:1:1510767623:12537] (90125840063167495)\n"37                "map     : fof_depot at: 0 x, 0 y, 0 z\n"38                "tags    : \n"39                "players : 0 humans, 0 bots (16 max)\n"40                "edicts  : 606 used of 2048 max\n"41                "# userid name                uniqueid            connected ping loss state  adr\n")42        server_dict = {}43        self.exporter._parse_status(data, server_dict)44        self.assertEqual(server_dict["hostname"], "LinuxGSM")45        self.assertEqual(server_dict["bots"], "0")46        self.assertEqual(server_dict["players"], "0")47        self.assertEqual(server_dict["max_players"], "16")48    def test_fof_stats(self):49        data = ("CPU    In_(KB/s)  Out_(KB/s)  Uptime  Map_changes  FPS      Players  Connects\n"50                "0.00   0.00       0.00        8       0            66.64    0        0\n")51        server_dict = dict()52        self.exporter._parse_stats(data, server_dict)53        for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 8,54                           "Maps": 0, "FPS": 66.64, "Players": 0, "Connects": 0}.items():55            self.assertEqual(server_dict[key], value)56    def test_hl2dm_status(self):57        data = ("hostname: LinuxGSM\n"58                "version : 4630212/24 4630212 insecure (secure mode enabled, disconnected from Steam3)\n"59                "udp/ip  : 0.0.0.0:27015\n"60                "steamid : not logged in\n"61                "map     : dm_lockdown at: 0 x, 0 y, 0 z\n"62                "tags    : alltalk,increased_maxplayers\n"63                "players : 0 humans, 0 bots (16 max)\n"64                "edicts  : 495 used of 2048 max\n"65                "# userid name                uniqueid            connected ping loss state  adr\n")66        server_dict = {}67        self.exporter._parse_status(data, server_dict)68        self.assertEqual(server_dict["hostname"], "LinuxGSM")69        self.assertEqual(server_dict["bots"], "0")70        self.assertEqual(server_dict["players"], "0")71        self.assertEqual(server_dict["max_players"], "16")72    def test_hl2dm_stats(self):73        data = ("CPU    In_(KB/s)  Out_(KB/s)  Uptime  Map_changes  FPS      Players  Connects\n"74                "0.00   0.00       0.00        0       0            66.70    0        0\n")75        server_dict = dict()76        self.exporter._parse_stats(data, server_dict)77        for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 0,78                           "Maps": 0, "FPS": 66.70, "Players": 0, "Connects": 0}.items():79            self.assertEqual(server_dict[key], value)80    def test_tf2_status(self):81        data = ("hostname: LinuxGSM\n"82                "version : 5063830/24 5063830 insecure (secure mode enabled, disconnected from Steam3)\n"83                "udp/ip  : 0.0.0.0:27015\n"84                "steamid : not logged in\n"85                "account : not logged in  (No account specified)\n"86                "map     : cp_badlands at: 0 x, 0 y, 0 z\n"87                "tags    : cp\n"88                "players : 0 humans, 0 bots (16 max)\n"89                "edicts  : 550 used of 2048 max\n"90                "# userid name                uniqueid            connected ping loss state  adr\n")91        server_dict = {}92        self.exporter._parse_status(data, server_dict)93        self.assertEqual(server_dict["hostname"], "LinuxGSM")94        self.assertEqual(server_dict["bots"], "0")95        self.assertEqual(server_dict["players"], "0")96        self.assertEqual(server_dict["max_players"], "16")97    def test_tf2_stats(self):98        data = ("CPU    In_(KB/s)  Out_(KB/s)  Uptime  Map_changes  FPS      Players  Connects\n"99                "0.00   0.00       0.00        0       0            66.67    0        0\n")100        server_dict = dict()101        self.exporter._parse_stats(data, server_dict)102        for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 0,103                           "Maps": 0, "FPS": 66.67, "Players": 0, "Connects": 0}.items():104            self.assertEqual(server_dict[key], value)105    def test_gmod_status(self):106        data = ("hostname: LinuxGSM\n"107                "version : 19.02.18/24 7472 insecure (secure mode enabled, disconnected from Steam3)\n"108                "udp/ip  : 192.0.2.1:27015\n"109                "map     : gm_construct at: 0 x, 0 y, 0 z\n"110                "players : 0 (16 max)\n"111                "\n"112                "# userid name                uniqueid            connected ping loss state  adr\n")113        server_dict = {}114        self.exporter._parse_status(data, server_dict)115        self.assertEqual(server_dict["hostname"], "LinuxGSM")116        self.assertEqual(server_dict["players"], "0")117        self.assertEqual(server_dict["max_players"], "16")118    def test_gmod_stats(self):119        data = ("CPU    In_(KB/s)  Out_(KB/s)  Uptime  Map_changes  FPS      Players  Connects\n"120                "33.00  0.00       0.00        1       0            66.07    0        0\n")121        server_dict = dict()122        self.exporter._parse_stats(data, server_dict)123        for key, value in {"CPU": 33, "NetIn": 0, "NetOut": 0, "Uptime": 1,124                           "Maps": 0, "FPS": 66.07, "Players": 0, "Connects": 0}.items():125            self.assertEqual(server_dict[key], value)126    def test_css_status(self):127        data = ("hostname: LinuxGSM\n"128                "version : 4630212/24 4630212 insecure (secure mode enabled, disconnected from Steam3)\n"129                "udp/ip  : 0.0.0.0:27015\n"130                "steamid : not logged in\n"131                "map     : de_dust2 at: 0 x, 0 y, 0 z\n"132                "tags    :\n"133                "players : 0 humans, 0 bots (16 max)\n"134                "edicts  : 212 used of 2048 max\n"135                "# userid name                uniqueid            connected ping loss state  adr\n")136        server_dict = {}137        self.exporter._parse_status(data, server_dict)138        self.assertEqual(server_dict["hostname"], "LinuxGSM")139        self.assertEqual(server_dict["players"], "0")140        self.assertEqual(server_dict["bots"], "0")141        self.assertEqual(server_dict["max_players"], "16")142    def test_css_stats(self):143        data = ("CPU    In_(KB/s)  Out_(KB/s)  Uptime  Map_changes  FPS      Players  Connects\n"144                "0.00   0.00       0.00        2       0            66.55    0        0\n")145        server_dict = dict()146        self.exporter._parse_stats(data, server_dict)147        for key, value in {"CPU": 0, "NetIn": 0, "NetOut": 0, "Uptime": 2,148                           "Maps": 0, "FPS": 66.55, "Players": 0, "Connects": 0}.items():...HHC-N8I8OP.py
Source:HHC-N8I8OP.py  
...36        self.ip = ip37        self.port = port38        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)39        self.s.connect((self.ip, self.port))40        status = self._parse_status(self.execute('read'))41        self._switch = {index: Switch(self, index, state) for index, state in status}42    def execute(self, c):43        time.sleep(0.1)44        c = str(c).encode()45        self.s.send(c)46        data = self.s.recv(8192)47        return data.decode()48    def __getitem__(self, item):49        return self._switch[item]50    @staticmethod51    def _parse_status(data):52        return [[index + 1, bool(int(state))] for index, state in53                enumerate(reversed(data.split('relay')[1]))]54    def _update_status(self, status):55        for i, s in status:56            self._switch[i].state = s57    def status(self):58        data = self.execute('read')59        status = self._parse_status(data)60        self._update_status(status)61        return status62    def state(self, state: bool, switch=None):63        if not switch:64            command = str(int(state)) * len(self._switch)65            data = self.execute(f'all{command}')66            status = self._parse_status(data)67            self._update_status(status)68        else:69            self._switch[switch].power = state70    def __repr__(self):71        return str([s for s in self._switch.values()])72    def __del__(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
