Best Python code snippet using lettuce-tools_python
gdb_wrapper.py
Source:gdb_wrapper.py  
...25            self.connect_to_port(port)26        if file is not None:27            self.gdb_ctrl.write("file " + file)28    @staticmethod29    def _parse_log(log, type: str):30        for el in log:31            if el['type'] == type:32                return el33    @staticmethod34    def _parse_flags(flags_value: int, all_flags: List):35        result = {}36        for i in range(len(all_flags)):37            flag_name = all_flags[i]38            if isinstance(flag_name, list):39                if flag_name[0] in result:40                    result[flag_name[0]] += (flags_value >> i & 1) << flag_name[1]41                else:42                    result[flag_name[0]] = (flags_value >> i & 1) << flag_name[1]43            else:44                result[flag_name] = flags_value >> i & 145        return result46    @no_response()47    def _get_updated_flags_reg(self, flag_name: str, flag_value: int, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):48        log = self.gdb_ctrl.write("p/t ${}".format(self._flags_name), timeout_sec)49        _, _, value_string = gdb_wrapper._parse_log(log, 'console')['payload'].partition(' = ')50        value_string = value_string.rstrip('\\n')51        value_string = '0' * (64 - len(value_string)) + value_string52        if flag_name not in self._flag_to_pos:53            raise KeyError('No such flag')54        value_list = list(value_string)55        for index, pos_in_reg in enumerate(self._flag_to_pos[flag_name]):56            value_list[- pos_in_reg - 1] = chr(48 + ((flag_value >> index) & 1))57        value_string = ''.join(value_list)58        return int(value_string, 2)59    @no_response()60    def connect_to_port(self, port: int, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):61        log = self.gdb_ctrl.write("target extended-remote localhost:" + str(port), timeout_sec)62        return log63    @no_response()64    def step_over(self, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):65        log = self.gdb_ctrl.write("-exec-next", timeout_sec)66        return log67    @no_response()68    def step_in(self, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):69        log = self.gdb_ctrl.write("-exec-step", timeout_sec)70        return log71    # ÐÑли Ñ Ð½Ð°Ñ ÑолÑко внеÑнÑÑ ÑÑнкÑиÑ, Ñо он ниÑего не ÑделаеÑ72    @no_response()73    def step_out(self, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):74        log = self.gdb_ctrl.write("-exec-finish", timeout_sec)75        return log76    @no_response()77    def get_stack(self, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):78        log = self.gdb_ctrl.write("-stack-list-frames", timeout_sec)79        if gdb_wrapper._parse_log(log, 'result')['message'] == 'error':80            return {}81        return gdb_wrapper._parse_log(log, 'result')['payload']82    @no_response()83    def get_registers(self, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):84        # надо пÑовеÑиÑÑ ÑÑо пÑоÑеÑÑ Ð·Ð°Ð¿ÑÑен85        result = []86        log = self.gdb_ctrl.write("-data-list-register-names", timeout_sec)87        indexes_of_registers = [index for index, elem in88                                enumerate(gdb_wrapper._parse_log(log, 'result')['payload']['register-names']) if89                                elem in self._registers]90        result.append([elem for elem in gdb_wrapper._parse_log(log, 'result')['payload']['register-names'] if91                       elem in self._registers])92        index_of_flags = 093        if self._flags_name in self._registers:94            index_of_flags = result[0].index(self._flags_name)95        log = self.gdb_ctrl.write("-data-list-register-values r {}".format(indexes_of_registers), timeout_sec)96        if self._parse_log(log, 'result')['message'] == 'error':97            return []98        # ÐÑо знаÑÐ¸Ñ ÑÑо пÑоÑеÑÑ Ð½Ðµ запÑÑен99        result.append(100            [reg_value['value'] for reg_value in gdb_wrapper._parse_log(log, 'result')['payload']['register-values']])101        if self._flags_name in self._registers:102            result[1][index_of_flags] = self.get_flags()103        return result104    @no_response()105    def set_breakpoint(self, line_number: int, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):106        log = self.gdb_ctrl.write('-break-insert --line {}'.format(line_number), timeout_sec)107        if gdb_wrapper._parse_log(log, 'result')['message'] != 'done':108            return log109        self.breakpoints[line_number] = gdb_wrapper._parse_log(log, 'result')['payload']['bkpt']['number']110        return log111    @no_response()112    def remove_breakpoint(self, line_number: int, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):113        if line_number in self.breakpoints:114            log = self.gdb_ctrl.write('-break-delete {}'.format(self.breakpoints[line_number]), timeout_sec)115            if gdb_wrapper._parse_log(log, 'result')['message'] == 'done':116                del self.breakpoints[line_number]117            return log118        return [{'type': 'result', 'message': 'error',119                 'payload': {'msg': 'No breakpoint at line number {}'.format(line_number)}, 'token': None,120                 'stream': 'stdout'}]121    @no_response()122    def write(self, command: Union[str, List[str]], timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):123        if command.strip() == "q":124            self.gdb_ctrl.exit()125            return []126        log = self.gdb_ctrl.write(mi_cmd_to_write=command, timeout_sec=timeout_sec)127        return log128    @no_response()129    def set_register(self, reg_name: str, value: int, timeout_sec: int = DEFAULT_GDB_TIMEOUT_SEC):130        if len(self.__all_regs) == 0:131            log = self.gdb_ctrl.write("-data-list-register-names", timeout_sec)132            self.__all_regs = gdb_wrapper._parse_log(log, 'result')['payload']['register-names']133        if reg_name not in self.__all_regs and reg_name not in self._flag_to_pos:134            raise KeyError('No such register')135        is_flag = reg_name in self._flag_to_pos136        if is_flag:137            flag_name = reg_name138            reg_name = self._flags_name139        if reg_name not in self.__changed_regs:140            log = self.gdb_ctrl.write('-var-create {0} * ${0}'.format(reg_name))141            if self._parse_log(log, 'result')['message'] == 'error':142                raise gdb_error(log, 'Error in creating a variable for a register')143            self.__changed_regs.add(reg_name)144        if is_flag:145            value = self._get_updated_flags_reg(flag_name, int(value), timeout_sec)146        return self.gdb_ctrl.write('-var-assign {} {}'.format(reg_name, value))147    @no_response()148    def quit(self):...server_controller.py
Source:server_controller.py  
...40    return self.vmDiagService.queryVirtualMachinesState()41  def get_logs(self, level = LoggingLevel.INFO):42    if self.controller is None:43      return []44    controller_logs = ServerController._parse_log(self.controller._logger, level)45    loadBalancer_logs = ServerController._parse_log(self.loadBalancer._logger, level)46    vm_logs = flatten([ ServerController._parse_log(vm._logger, level) for vm in self.virtualMachines ])47    return sorted(flatten([controller_logs, loadBalancer_logs, vm_logs]), key=lambda l: l['timestamp'])48  def _parse_log(logger_instance: Logger, level: LoggingLevel):49    logs = logger_instance.read_logs()50    logs_arr = logs.split('\n')51    p = re.compile('\[(?P<level>[^\[\]]*)\] \[(?P<timestamp>[^\[\]]*)\] (?P<content>.*)')52    parsed_arr = []53    who = logger_instance.file_name.split('.')[0]54    for log in logs_arr:55      m = p.match(log)56      if m is not None:57        m_dict = m.groupdict()58        m_dict["from"] = who59        parsed_arr.append(m_dict)60    return parsed_arr...flow_logs.py
Source:flow_logs.py  
...7        super(FlowLogs, self).__init__(facade)8    async def fetch_all(self):9        raw_logs = await self.facade.ec2.get_flow_logs(self.region)10        for raw_log in raw_logs:11            id, log = self._parse_log(raw_log)12            self[id] = log13    def _parse_log(self, raw_log):14        get_name(raw_log, raw_log, 'FlowLogId')15        log_id = raw_log.pop('FlowLogId')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
