Best Python code snippet using yandex-tank
client.py
Source:client.py  
...157                    time.sleep(timeout)158                    continue159                except StopIteration:160                    raise e161    def __make_writer_request(162            self,163            params=None,164            json=None,165            http_method="POST",166            trace=False):167        '''168        Send request to writer service.169        '''170        request = requests.Request(171            http_method,172            self.writer_url,173            params=params,174            json=json,175            headers={176                'User-Agent': self.user_agent})177        network_timeouts = self.network_timeouts()178        maintenance_timeouts = self.maintenance_timeouts()179        while True:180            try:181                response = self.__send_single_request(request, trace=trace)182                return response183            except (Timeout, ConnectionError):184                logger.warn(traceback.format_exc())185                try:186                    timeout = next(network_timeouts)187                    logger.warn(188                        "Network error, will retry in %ss..." %189                        timeout)190                    time.sleep(timeout)191                    continue192                except StopIteration:193                    raise self.NetworkError()194            except self.UnderMaintenance as e:195                try:196                    timeout = next(maintenance_timeouts)197                    logger.warn(198                        "Writer is under maintenance, will retry in %ss..." %199                        timeout)200                    time.sleep(timeout)201                    continue202                except StopIteration:203                    raise e204    def __get(self, addr, trace=False, maintenance_timeouts=None, maintenance_msg=None):205        return self.__make_api_request(206            'GET',207            addr,208            trace=trace,209            response_callback=lambda r: json.loads(r.content.decode('utf8')),210            maintenance_timeouts=maintenance_timeouts,211            maintenance_msg=maintenance_msg212        )213    def __post_raw(self, addr, txt_data, trace=False):214        return self.__make_api_request(215            'POST', addr, txt_data, lambda r: r.content, trace=trace)216    def __post(self, addr, data, trace=False):217        return self.__make_api_request(218            'POST',219            addr,220            json=data,221            response_callback=lambda r: r.json(),222            trace=trace)223    def __put(self, addr, data, trace=False):224        return self.__make_api_request(225            'PUT',226            addr,227            json=data,228            response_callback=lambda r: r.text,229            trace=trace)230    def __patch(self, addr, data, trace=False):231        return self.__make_api_request(232            'PATCH',233            addr,234            json=data,235            response_callback=lambda r: r.text,236            trace=trace)237    def get_task_data(self, task, trace=False):238        return self.__get("api/task/" + task + "/summary.json", trace=trace)239    def new_job(240            self,241            task,242            person,243            tank,244            target_host,245            target_port,246            loadscheme=None,247            detailed_time=None,248            notify_list=None,249            trace=False):250        """251        :return: job_nr, upload_token252        :rtype: tuple253        """254        if not notify_list:255            notify_list = []256        data = {257            'task': task,258            'person': person,259            'tank': tank,260            'host': target_host,261            'port': target_port,262            'loadscheme': loadscheme,263            'detailed_time': detailed_time,264            'notify': notify_list265        }266        logger.debug("Job create request: %s", data)267        api_timeouts = self.api_timeouts()268        while True:269            try:270                response = self.__post(271                    "api/job/create.json", data, trace=trace)[0]272                # [{"upload_token": "1864a3b2547d40f19b5012eb038be6f6", "job": 904317}]273                return response['job'], response['upload_token']274            except (self.NotAvailable, self.StoppedFromOnline) as e:275                try:276                    timeout = next(api_timeouts)277                    logger.warn("API error, will retry in %ss..." % timeout)278                    time.sleep(timeout)279                    continue280                except StopIteration:281                    logger.warn('Failed to create job on lunapark')282                    raise self.JobNotCreated(e.message)283            except Exception as e:284                logger.warn('Failed to create job on lunapark')285                logger.warn(e.message)286                raise self.JobNotCreated()287    def get_job_summary(self, jobno):288        result = self.__get('api/job/' + str(jobno) + '/summary.json')289        return result[0]290    def close_job(self, jobno, retcode, trace=False):291        params = {'exitcode': str(retcode)}292        result = self.__get('api/job/' + str(jobno) + '/close.json?' +293                            urllib.urlencode(params), trace=trace)294        return result[0]['success']295    def edit_job_metainfo(296            self,297            jobno,298            job_name,299            job_dsc,300            instances,301            ammo_path,302            loop_count,303            version_tested,304            is_regression,305            component,306            cmdline,307            is_starred,308            tank_type=0,309            trace=False):310        data = {311            'name': job_name,312            'description': job_dsc,313            'instances': str(instances),314            'ammo': ammo_path,315            'loop': loop_count,316            'version': version_tested,317            'regression': str(is_regression),318            'component': component,319            'tank_type': int(tank_type),320            'command_line': cmdline,321            'starred': int(is_starred)322        }323        response = self.__post(324            'api/job/' +325            str(jobno) +326            '/edit.json',327            data,328            trace=trace)329        return response330    def set_imbalance_and_dsc(self, jobno, rps, comment):331        data = {}332        if rps:333            data['imbalance'] = rps334        if comment:335            res = self.get_job_summary(jobno)336            data['description'] = (res['dsc'] + "\n" + comment).strip()337        response = self.__post('api/job/' + str(jobno) + '/edit.json', data)338        return response339    def second_data_to_push_item(self, data, stat, timestamp, overall, case):340        """341        @data: SecondAggregateDataItem342        """343        api_data = {344            'overall': overall,345            'case': case,346            'net_codes': [],347            'http_codes': [],348            'time_intervals': [],349            'trail': {350                'time': str(timestamp),351                'reqps': stat["metrics"]["reqps"],352                'resps': data["interval_real"]["len"],353                'expect': data["interval_real"]["total"] / 1000.0 /354                data["interval_real"]["len"],355                'disper': 0,356                'self_load':357                    0,  # TODO abs(round(100 - float(data.selfload), 2)),358                'input': data["size_in"]["total"],359                'output': data["size_out"]["total"],360                'connect_time': data["connect_time"]["total"] / 1000.0 /361                data["connect_time"]["len"],362                'send_time':363                    data["send_time"]["total"] / \364                1000.0 / data["send_time"]["len"],365                'latency':366                    data["latency"]["total"] / 1000.0 / data["latency"]["len"],367                'receive_time': data["receive_time"]["total"] / 1000.0 /368                data["receive_time"]["len"],369                'threads': stat["metrics"]["instances"],  # TODO370            }371        }372        for q, value in zip(data["interval_real"]["q"]["q"],373                            data["interval_real"]["q"]["value"]):374            api_data['trail']['q' + str(q)] = value / 1000.0375        for code, cnt in data["net_code"]["count"].items():376            api_data['net_codes'].append({'code': int(code),377                                          'count': int(cnt)})378        for code, cnt in data["proto_code"]["count"].items():379            api_data['http_codes'].append({'code': int(code),380                                           'count': int(cnt)})381        api_data['time_intervals'] = self.convert_hist(data["interval_real"][382            "hist"])383        return api_data384    def convert_hist(self, hist):385        data = hist['data']386        bins = hist['bins']387        return [388            {389                "from": 0,  # deprecated390                "to": b / 1000.0,391                "count": count,392            } for b, count in zip(bins, data)393        ]394    def push_test_data(395            self,396            jobno,397            upload_token,398            data_item,399            stat_item,400            trace=False):401        items = []402        uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(403            jobno, upload_token)404        ts = data_item["ts"]405        for case_name, case_data in data_item["tagged"].items():406            if case_name == "":407                case_name = "__NOTAG__"408            if (len(case_name)) > 128:409                raise RuntimeError('tag (case) name is too long: ' + case_name)410            push_item = self.second_data_to_push_item(case_data, stat_item, ts,411                                                      0, case_name)412            items.append(push_item)413        overall = self.second_data_to_push_item(data_item["overall"],414                                                stat_item, ts, 1, '')415        items.append(overall)416        api_timeouts = self.api_timeouts()417        while True:418            try:419                if self.writer_url:420                    res = self.__make_writer_request(421                        params={422                            "jobno": jobno,423                            "upload_token": upload_token,424                        },425                        json={426                            "trail": items,427                        },428                        trace=trace)429                    logger.debug("Writer response: %s", res.text)430                    return res.json()["success"]431                else:432                    res = self.__post(uri, items, trace=trace)433                    logger.debug("API response: %s", res)434                    success = int(res[0]['success'])435                    return success436            except self.NotAvailable as e:437                try:438                    timeout = next(api_timeouts)439                    logger.warn("API error, will retry in %ss...", timeout)440                    time.sleep(timeout)441                    continue442                except StopIteration:443                    raise e444    def push_monitoring_data(445            self,446            jobno,447            upload_token,448            send_data,449            trace=False):450        if send_data:451            addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (452                jobno, upload_token)453            api_timeouts = self.api_timeouts()454            while True:455                try:456                    if self.writer_url:457                        res = self.__make_writer_request(458                            params={459                                "jobno": jobno,460                                "upload_token": upload_token,461                            },462                            json={463                                "monitoring": send_data,464                            },465                            trace=trace)466                        logger.debug("Writer response: %s", res.text)467                        return res.json()["success"]468                    else:469                        res = self.__post_raw(470                            addr, json.dumps(send_data), trace=trace)471                        logger.debug("API response: %s", res)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
