Best Python code snippet using yandex-tank
utils.py
Source:utils.py  
1""" Utility classes for phantom module """2# TODO: use separate answ log per benchmark3import copy4import logging5import multiprocessing6import string7from pkg_resources import resource_string8from yandextank.common.util import AddressWizard9from ...stepper import StepperWrapper10from ...stepper.util import parse_duration11logger = logging.getLogger(__name__)12class PhantomConfig:13    """ config file generator """14    OPTION_PHOUT = "phout_file"15    SECTION = 'phantom'16    def __init__(self, core):17        self.core = core18        self.streams = []19        # common20        self.timeout = 1100021        self.answ_log = None22        self.answ_log_level = None23        self.phout_file = None24        self.stat_log = None25        self.phantom_log = None26        self.phantom_start_time = None27        self.phantom_modules_path = None28        self.threads = None29        self.additional_libs = None30        self.enum_ammo = False31    def get_option(self, opt_name, default=None):32        """ get option wrapper """33        return self.core.get_option(self.SECTION, opt_name, default)34    @staticmethod35    def get_available_options():36        opts = [37            "threads",38            "phantom_modules_path",39            "additional_libs",40            "writelog",41            "enum_ammo",42            "timeout",43        ]44        opts += StreamConfig.get_available_options()45        return opts46    def read_config(self):47        """        Read phantom tool specific options        """48        self.threads = self.get_option(49            "threads", str(int(multiprocessing.cpu_count() / 2) + 1))50        self.phantom_modules_path = self.get_option(51            "phantom_modules_path", "/usr/lib/phantom")52        self.additional_libs = self.get_option("additional_libs", "")53        self.answ_log_level = self.get_option("writelog", "none")54        if self.answ_log_level == '0':55            self.answ_log_level = 'none'56        elif self.answ_log_level == '1':57            self.answ_log_level = 'all'58        self.timeout = parse_duration(self.get_option("timeout", "11s"))59        if self.timeout > 120000:60            logger.warning(61                "You've set timeout over 2 minutes."62                " Are you a functional tester?")63        self.answ_log = self.core.mkstemp(".log", "answ_")64        self.core.add_artifact_file(self.answ_log)65        self.phout_file = self.core.get_option(66            self.SECTION, self.OPTION_PHOUT, '')67        if not self.phout_file:68            self.phout_file = self.core.mkstemp(".log", "phout_")69            self.core.set_option(70                self.SECTION, self.OPTION_PHOUT, self.phout_file)71        self.core.add_artifact_file(self.phout_file)72        self.stat_log = self.core.mkstemp(".log", "phantom_stat_")73        self.core.add_artifact_file(self.stat_log)74        self.phantom_log = self.core.mkstemp(".log", "phantom_")75        self.core.add_artifact_file(self.phantom_log)76        main_stream = StreamConfig(77            self.core,78            len(self.streams), self.phout_file, self.answ_log,79            self.answ_log_level, self.timeout, self.SECTION)80        self.streams.append(main_stream)81        for section in self.core.config.find_sections(self.SECTION + '-'):82            self.streams.append(83                StreamConfig(84                    self.core,85                    len(self.streams), self.phout_file, self.answ_log,86                    self.answ_log_level, self.timeout, section))87        for stream in self.streams:88            stream.read_config()89        if any(stream.ssl for stream in self.streams):90            self.additional_libs += ' ssl io_benchmark_method_stream_transport_ssl'91    def compose_config(self):92        """        Generate phantom tool run config        """93        streams_config = ''94        stat_benchmarks = ''95        for stream in self.streams:96            streams_config += stream.compose_config()97            if stream.section != self.SECTION:98                stat_benchmarks += " " + "benchmark_io%s" % stream.sequence_no99        kwargs = {}100        kwargs['threads'] = self.threads101        kwargs['phantom_log'] = self.phantom_log102        kwargs['stat_log'] = self.stat_log103        kwargs['benchmarks_block'] = streams_config104        kwargs['stat_benchmarks'] = stat_benchmarks105        kwargs['additional_libs'] = self.additional_libs106        kwargs['phantom_modules_path'] = self.phantom_modules_path107        filename = self.core.mkstemp(".conf", "phantom_")108        self.core.add_artifact_file(filename)109        logger.debug("Generating phantom config: %s", filename)110        template_str = resource_string(__name__, "config/phantom.conf.tpl")111        tpl = string.Template(template_str)112        config = tpl.substitute(kwargs)113        handle = open(filename, 'w')114        handle.write(config)115        handle.close()116        return filename117    def set_timeout(self, timeout):118        """ pass timeout to all streams """119        for stream in self.streams:120            stream.timeout = timeout121    def get_info(self):122        """ get merged info about phantom conf """123        result = copy.copy(self.streams[0])124        result.stat_log = self.stat_log125        result.steps = []126        result.ammo_file = ''127        result.rps_schedule = None128        result.ammo_count = 0129        result.duration = 0130        result.instances = 0131        result.loadscheme = []132        result.loop_count = 0133        for stream in self.streams:134            sec_no = 0135            logger.debug("Steps: %s", stream.stepper_wrapper.steps)136            for item in stream.stepper_wrapper.steps:137                for x in range(0, item[1]):138                    if len(result.steps) > sec_no:139                        result.steps[sec_no][0] += item[0]140                    else:141                        result.steps.append([item[0], 1])142                    sec_no += 1143            if result.rps_schedule:144                result.rps_schedule = []145            else:146                result.rps_schedule = stream.stepper_wrapper.loadscheme147            if result.loadscheme:148                result.loadscheme = ''149            else:150                # FIXME: add formatted load scheme for server:151                # <step_size,step_type,first_rps,last_rps,original_step_params>152                # as a string153                result.loadscheme = ''154            if result.loop_count:155                result.loop_count = u'0'156            else:157                result.loop_count = stream.stepper_wrapper.loop_count158            result.ammo_file += stream.stepper_wrapper.ammo_file + ' '159            result.ammo_count += stream.stepper_wrapper.ammo_count160            result.duration = max(161                result.duration, stream.stepper_wrapper.duration)162            result.instances += stream.instances163        if not result.ammo_count:164            raise ValueError("Total ammo count cannot be zero")165        return result166class StreamConfig:167    """ each test stream's config """168    OPTION_INSTANCES_LIMIT = 'instances'169    def __init__(170            self, core, sequence, phout, answ, answ_level, timeout, section):171        self.core = core172        self.address_wizard = AddressWizard()173        self.sequence_no = sequence174        self.section = section175        self.stepper_wrapper = StepperWrapper(self.core, self.section)176        self.phout_file = phout177        self.answ_log = answ178        self.answ_log_level = answ_level179        self.timeout = timeout180        # per benchmark181        self.instances = None182        self.ipv6 = None183        self.ssl = None184        self.address = None185        self.port = None186        self.tank_type = None187        self.stpd = None188        self.gatling = None189        self.phantom_http_line = None190        self.phantom_http_field_num = None191        self.phantom_http_field = None192        self.phantom_http_entity = None193        self.resolved_ip = None194        self.method_prefix = None195        self.source_log_prefix = None196        self.method_options = None197        self.client_cipher_suites = None198        self.client_certificate = None199        self.client_key = None200    def get_option(self, option_ammofile, default=None):201        """ get option wrapper """202        return self.core.get_option(self.section, option_ammofile, default)203    @staticmethod204    def get_available_options():205        opts = [206            "ssl", "tank_type", 'gatling_ip', "method_prefix",207            "source_log_prefix"208        ]209        opts += [210            "phantom_http_line", "phantom_http_field_num", "phantom_http_field",211            "phantom_http_entity"212        ]213        opts += ['address', "port", StreamConfig.OPTION_INSTANCES_LIMIT]214        opts += StepperWrapper.get_available_options()215        opts += ["connection_test"]216        return opts217    def read_config(self):218        """ reads config """219        # multi-options220        self.ssl = int(self.get_option("ssl", '0'))221        self.tank_type = self.get_option("tank_type", 'http')222        # TODO: refactor. Maybe we should decide how to interact with223        # StepperWrapper here.224        self.instances = int(225            self.get_option(self.OPTION_INSTANCES_LIMIT, '1000'))226        self.gatling = ' '.join(self.get_option('gatling_ip', '').split("\n"))227        self.method_prefix = self.get_option("method_prefix", 'method_stream')228        self.method_options = self.get_option("method_options", '')229        self.source_log_prefix = self.get_option("source_log_prefix", '')230        self.phantom_http_line = self.get_option("phantom_http_line", "")231        self.phantom_http_field_num = self.get_option(232            "phantom_http_field_num", "")233        self.phantom_http_field = self.get_option("phantom_http_field", "")234        self.phantom_http_entity = self.get_option("phantom_http_entity", "")235        self.address = self.get_option('address', '127.0.0.1')236        do_test_connect = int(self.get_option("connection_test", "1")) > 0237        explicit_port = self.get_option('port', '')238        self.ipv6, self.resolved_ip, self.port, self.address = self.address_wizard.resolve(239            self.address, do_test_connect, explicit_port)240        logger.info(241            "Resolved %s into %s:%s", self.address, self.resolved_ip, self.port)242        self.client_cipher_suites = self.get_option("client_cipher_suites", "")243        self.client_certificate = self.get_option("client_certificate", "")244        self.client_key = self.get_option("client_key", "")245        self.stepper_wrapper.read_config()246    def compose_config(self):247        """ compose benchmark block """248        # step file249        self.stepper_wrapper.prepare_stepper()250        self.stpd = self.stepper_wrapper.stpd251        if self.stepper_wrapper.instances:252            self.instances = self.stepper_wrapper.instances253        if not self.stpd:254            raise RuntimeError("Cannot proceed with no STPD file")255        kwargs = {}256        kwargs['sequence_no'] = self.sequence_no257        if self.ssl:258            _auth_section = ''259            _ciphers = ''260            ssl_template = "transport_t ssl_transport = transport_ssl_t {\n" \261                           "                timeout = 1s\n" \262                           "                %s\n" \263                           "                %s}\n" \264                           "                transport = ssl_transport"265            if self.client_certificate or self.client_key:266                _auth_section = 'auth_t def_auth = auth_t { key = "%s" cert = "%s"} auth = def_auth' \267                                % (self.client_key, self.client_certificate)268            if self.client_cipher_suites:269                _ciphers = 'ciphers = "%s"' % self.client_cipher_suites270            kwargs['ssl_transport'] = ssl_template % (_auth_section, _ciphers)271        else:272            kwargs['ssl_transport'] = ""273        kwargs['method_stream'] = self.method_prefix + \274            "_ipv6_t" if self.ipv6 else self.method_prefix + "_ipv4_t"275        kwargs['phout'] = self.phout_file276        kwargs['answ_log'] = self.answ_log277        kwargs['answ_log_level'] = self.answ_log_level278        kwargs['comment_answ'] = "# " if self.answ_log_level == 'none' else ''279        kwargs['stpd'] = self.stpd280        kwargs['source_log_prefix'] = self.source_log_prefix281        kwargs['method_options'] = self.method_options282        if self.tank_type:283            kwargs[284                'proto'] = "proto=http_proto%s" % self.sequence_no if self.tank_type == 'http' else "proto=none_proto"285            kwargs['comment_proto'] = ""286        else:287            kwargs['proto'] = ""288            kwargs['comment_proto'] = "#"289        if self.gatling:290            kwargs['bind'] = 'bind={ ' + self.gatling + ' }'291        else:292            kwargs['bind'] = ''293        kwargs['ip'] = self.resolved_ip294        kwargs['port'] = self.port295        kwargs['timeout'] = self.timeout296        kwargs['instances'] = self.instances297        tune = ''298        if self.phantom_http_entity:299            tune += "entity = " + self.phantom_http_entity + "\n"300        if self.phantom_http_field:301            tune += "field = " + self.phantom_http_field + "\n"302        if self.phantom_http_field_num:303            tune += "field_num = " + self.phantom_http_field_num + "\n"304        if self.phantom_http_line:305            tune += "line = " + self.phantom_http_line + "\n"306        if tune:307            kwargs['reply_limits'] = 'reply_limits = {\n' + tune + "}"308        else:309            kwargs['reply_limits'] = ''310        if self.section == PhantomConfig.SECTION:311            fname = 'phantom_benchmark_main.tpl'312        else:313            fname = 'phantom_benchmark_additional.tpl'314        template_str = template_str = resource_string(315            __name__, "config/" + fname)316        tpl = string.Template(template_str)317        config = tpl.substitute(kwargs)318        return config...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
