How to use _get_call_creds method in yandex-tank

Best Python code snippet using yandex-tank

client.py

Source:client.py Github

copy

Full Screen

...656 cert = ssl.get_server_certificate(tuple(url.split(':')))657 creds = grpc.ssl_channel_credentials(cert.encode('utf-8'))658 return creds659 @staticmethod660 def _get_call_creds(token):661 return grpc.access_token_call_credentials(token)662 # TODO retry663 def _set_connection(self, token, compute_instance_id):664 try:665 stub_register = agent_registration_service_pb2_grpc.AgentRegistrationServiceStub(self.channel)666 response = stub_register.Register(667 agent_registration_service_pb2.RegisterRequest(compute_instance_id=compute_instance_id),668 timeout=self.connection_timeout,669 metadata=[('authorization', f'Bearer {token}')]670 # credentials=self._get_call_creds(token)671 )672 if response.agent_instance_id:673 self.agent_instance_id = response.agent_instance_id674 self.trail_stub = trail_service_pb2_grpc.TrailServiceStub(self.channel)675 self.test_stub = test_service_pb2_grpc.TestServiceStub(self.channel)676 logger.info('Init connection to cloud load testing is succeeded')677 else:678 raise self.AgentIdNotFound("Couldn't get agent cloud id")679 except Exception as e:680 raise self.NotAvailable(f"Couldn't connect to cloud load testing: {e}")681 @property682 def token(self):683 if self._token is None:684 self._token = get_iam_token()685 return self._token686 @property687 def compute_instance_id(self):688 if self._compute_instance_id is None:689 self._compute_instance_id = get_current_instance_id()690 return self._compute_instance_id691 def api_timeouts(self):692 for attempt in range(self.api_attempts - 1):693 multiplier = random.uniform(1, 1.5)694 yield min(2**attempt * multiplier, self.max_api_timeout)695 @staticmethod696 def build_codes(codes_data):697 codes = []698 for code in codes_data:699 codes.append(trail_service_pb2.Trail.Codes(code=int(code['code']), count=int(code['count'])))700 return codes701 @staticmethod702 def build_intervals(intervals_data):703 intervals = []704 for interval in intervals_data:705 intervals.append(trail_service_pb2.Trail.Intervals(to=interval['to'], count=int(interval['count'])))706 return intervals707 def convert_to_proto_message(self, items):708 trails = []709 for item in items:710 trail_data = item["trail"]711 trail = trail_service_pb2.Trail(712 overall=int(item["overall"]),713 case_id=item["case"],714 time=trail_data["time"],715 reqps=int(trail_data["reqps"]),716 resps=int(trail_data["resps"]),717 expect=trail_data["expect"],718 input=int(trail_data["input"]),719 output=int(trail_data["output"]),720 connect_time=trail_data["connect_time"],721 send_time=trail_data["send_time"],722 latency=trail_data["latency"],723 receive_time=trail_data["receive_time"],724 threads=int(trail_data["threads"]),725 q50=trail_data.get('q50'),726 q75=trail_data.get('q75'),727 q80=trail_data.get('q80'),728 q85=trail_data.get('q85'),729 q90=trail_data.get('q90'),730 q95=trail_data.get('q95'),731 q98=trail_data.get('q98'),732 q99=trail_data.get('99'),733 q100=trail_data.get('q100'),734 http_codes=self.build_codes(item['http_codes']),735 net_codes=self.build_codes(item['net_codes']),736 time_intervals=self.build_intervals(item['time_intervals']),737 )738 trails.append(trail)739 return trails740 def send_trails(self, instance_id, cloud_job_id, trails):741 try:742 request = trail_service_pb2.CreateTrailRequest(743 compute_instance_id=str(instance_id),744 job_id=str(cloud_job_id),745 data=trails746 )747 result = self.trail_stub.Create(748 request,749 timeout=self.connection_timeout,750 metadata=[('authorization', f'Bearer {self.token}')]751 # credentials=self._get_call_creds(self.token)752 )753 logger.debug(f'Send trails: {trails}')754 return result.code755 except grpc.RpcError as err:756 if err.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):757 raise self.NotAvailable('Connection is closed. Try to set it again.')758 raise err759 except Exception as err:760 raise err761 def push_test_data(762 self,763 cloud_job_id,764 data_item,765 stat_item,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful