Best Python code snippet using yandex-tank
main.py
Source:main.py  
...180            self.core.set_option(self.section, self.OPTION_STPD, self.stpd)181            if self.use_caching and not self.force_stepping and os.path.exists(182                    self.stpd) and os.path.exists(self.__si_filename()):183                self.log.info("Using cached stpd-file: %s", self.stpd)184                stepper_info = self.__read_cached_options()185                if self.instances and self.rps_schedule:186                    self.log.info(187                        "rps_schedule is set. Overriding cached instances param from config: %s",188                        self.instances)189                    stepper_info = stepper_info._replace(190                        instances=self.instances)191                publish_info(stepper_info)192            else:193                if (194                        self.force_stepping and195                        os.path.exists(self.__si_filename())):196                    os.remove(self.__si_filename())197                self.__make_stpd_file()198                stepper_info = info.status.get_info()199                self.__write_cached_options(stepper_info)200        else:201            self.log.info("Using specified stpd-file: %s", self.stpd)202            stepper_info = publish_info(self.__read_cached_options())203        self.ammo_count = stepper_info.ammo_count204        self.duration = stepper_info.duration205        self.loop_count = stepper_info.loop_count206        self.loadscheme = stepper_info.loadscheme207        self.steps = stepper_info.steps208        if stepper_info.instances:209            self.instances = stepper_info.instances210    def __si_filename(self):211        '''Return name for stepper_info json file'''212        return "%s_si.json" % self.stpd213    def __get_stpd_filename(self):214        ''' Choose the name for stepped data file '''215        if self.use_caching:216            sep = "|"217            hasher = hashlib.md5()218            hashed_str = "cache version 6" + sep + \219                ';'.join(self.instances_schedule) + sep + str(self.loop_limit)220            hashed_str += sep + str(self.ammo_limit) + sep + ';'.join(221                self.rps_schedule) + sep + str(self.autocases)222            hashed_str += sep + ";".join(self.uris) + sep + ";".join(223                self.headers) + sep + self.http_ver + sep + ";".join(224                    self.chosen_cases)225            hashed_str += sep + str(self.enum_ammo) + sep + str(self.ammo_type)226            if self.instances_schedule:227                hashed_str += sep + str(self.instances)228            if self.ammo_file:229                opener = resource.get_opener(self.ammo_file)230                hashed_str += sep + opener.hash231            else:232                if not self.uris:233                    raise RuntimeError("Neither ammofile nor uris specified")234                hashed_str += sep + \235                    ';'.join(self.uris) + sep + ';'.join(self.headers)236            self.log.debug("stpd-hash source: %s", hashed_str)237            hasher.update(hashed_str.encode('utf8'))238            if not os.path.exists(self.cache_dir):239                os.makedirs(self.cache_dir)240            stpd = self.cache_dir + '/' + \241                os.path.basename(self.ammo_file) + \242                "_" + hasher.hexdigest() + ".stpd"243        else:244            stpd = os.path.realpath("ammo.stpd")245        self.log.debug("Generated cache file name: %s", stpd)246        return stpd247    def __read_cached_options(self):248        '''249        Read stepper info from json250        '''251        self.log.debug("Reading cached stepper info: %s", self.__si_filename())252        with open(self.__si_filename(), 'r') as si_file:253            si = info.StepperInfo(**json.load(si_file))254        return si255    def __write_cached_options(self, si):256        '''257        Write stepper info to json258        '''259        self.log.debug("Saving stepper info: %s", self.__si_filename())260        with open(self.__si_filename(), 'w') as si_file:261            json.dump(si._asdict(), si_file, indent=4)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
