How to use _check_status_code method in tavern

Best Python code snippet using tavern

aws_client.py

Source:aws_client.py Github

copy

Full Screen

...26 @aws_api_caller()27 def get_user_identity(self):28 # Call API29 response = self.sts_client().get_caller_identity()30 self._check_status_code(response)31 return response32 @aws_api_caller()33 def get_availability_zones(self):34 # Call API35 response = self.ec2_client().describe_availability_zones()36 self._check_status_code(response)37 try:38 zones = [zone['ZoneName'] for zone in response['AvailabilityZones'] if zone['State'] == 'available']39 except KeyError as e:40 raise ProviderRequestError('Response from AWS has unexpected format: {}.'.format(e.message))41 return zones42 @aws_api_caller()43 def get_subnets(self, availability_zone):44 # Define filters45 filters = [{'Name': 'availability-zone', 'Values': [availability_zone]},46 {'Name': 'default-for-az', 'Values': ['true']}]47 # Call API48 response = self.ec2_client().describe_subnets(Filters=filters)49 self._check_status_code(response)50 try:51 subnets = response['Subnets']52 except KeyError as e:53 raise ProviderRequestError('Response from AWS has unexpected format: {}.'.format(e.message))54 return subnets55 @aws_api_caller()56 def find_spot_instance(self, portal_name, user):57 # Define filters58 filters = [{'Name': 'tag:portal-name', 'Values': [portal_name]},59 {'Name': 'tag:created-by', 'Values': [user]},60 {'Name': 'instance-state-name', 'Values': ['running', 'pending']}]61 # Call API62 response = self.ec2_client().describe_instances(Filters=filters)63 self._check_status_code(response)64 if len(response['Reservations']) == 0 or len(response['Reservations'][0]['Instances']) == 0:65 return None66 return response['Reservations'][0]['Instances'][0]67 @aws_api_caller()68 def get_instance(self, instance_id):69 # Call API70 response = self.ec2_client().describe_instances(InstanceIds=[instance_id])71 self._check_status_code(response)72 if len(response['Reservations']) == 0 or len(response['Reservations'][0]['Instances']) == 0:73 return None74 return response['Reservations'][0]['Instances'][0]75 @aws_api_caller()76 def get_spot_fleet_instances(self, spot_fleet_request_id):77 # Call API78 response = self.ec2_client().describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_request_id)79 self._check_status_code(response)80 return response['ActiveInstances']81 @aws_api_caller()82 def get_spot_fleet_request(self, spot_fleet_request_id):83 # Call API84 response = self.ec2_client().describe_spot_fleet_requests(SpotFleetRequestIds=[spot_fleet_request_id])85 self._check_status_code(response)86 if len(response['SpotFleetRequestConfigs']) == 0:87 return None88 return response['SpotFleetRequestConfigs'][0]89 @aws_api_caller()90 def get_volumes_by_id(self, volume_ids):91 """92 :param volume_ids: One or several volume Ids93 :type volume_ids: string or list94 :return:95 """96 # Call API97 response = self.ec2_client().describe_volumes(VolumeIds=AwsClient._as_list(volume_ids))98 self._check_status_code(response)99 return response['Volumes']100 @aws_api_caller()101 def get_volumes(self, filters=None):102 if filters is None:103 filters = {}104 # Convert list of filters to the expected format105 aws_filters = [{'Name': k, 'Values': AwsClient._as_list(v)} for k, v in filters.items()]106 # Call API107 response = self.ec2_client().describe_volumes(Filters=aws_filters)108 self._check_status_code(response)109 return response['Volumes']110 @aws_api_caller()111 def create_volume(self, size, availability_zone, tags=None, snapshot_id=None):112 if tags is None:113 tags = {}114 if snapshot_id is None:115 snapshot_id = ''116 # Convert tags to the expected format117 aws_tags = to_aws_tags(tags)118 # Call API119 response = self.ec2_client().create_volume(AvailabilityZone=availability_zone,120 Size=size,121 VolumeType='gp2',122 SnapshotId=snapshot_id,123 TagSpecifications=[{'ResourceType': 'volume', 'Tags': aws_tags}])124 self._check_status_code(response)125 return response['VolumeId']126 @aws_api_caller()127 def update_volume(self, volume_id, size):128 # Call API129 response = self.ec2_client().modify_volume(VolumeId=volume_id,130 Size=size)131 self._check_status_code(response)132 return response133 @aws_api_caller()134 def attach_volume(self, instance_id, volume_id, device):135 # Call API136 response = self.ec2_client().attach_volume(InstanceId=instance_id,137 VolumeId=volume_id,138 Device=device)139 self._check_status_code(response)140 return response141 @aws_api_caller()142 def delete_volume(self, volume_id):143 # Call API144 response = self.ec2_client().delete_volume(VolumeId=volume_id)145 self._check_status_code(response)146 return response147 @aws_api_caller()148 def add_tags(self, resource_ids, tags):149 """150 Add or overwrite tags for an EC2 resource (e.g. an instance or a volume).151 :param resource_ids: One or several resources to be affected152 :param tags: Dictionary of tags153 :type resource_ids: string or list154 :type tags: dict155 :return:156 """157 # Convert tags to the expected format158 aws_tags = to_aws_tags(tags)159 # Call API160 response = self.ec2_client().create_tags(Resources=AwsClient._as_list(resource_ids), Tags=aws_tags)161 self._check_status_code(response)162 return True163 @aws_api_caller()164 def remove_tags(self, resource_ids, keys):165 """166 Remove tags for an EC2 resource (e.g. an instance or a volume).167 :param resource_ids: One or several resources to be affected168 :param keys: One or several tag keys to be removed169 :type resource_ids: string or list170 :type keys: string or list171 :return:172 """173 aws_tags = [{'Key': key} for key in AwsClient._as_list(keys)]174 # Call API175 response = self.ec2_client().delete_tags(Resources=AwsClient._as_list(resource_ids), Tags=aws_tags)176 self._check_status_code(response)177 return True178 @aws_api_caller()179 def request_spot_fleet(self, config):180 # Call API181 response = self.ec2_client().request_spot_fleet(SpotFleetRequestConfig=config)182 self._check_status_code(response)183 return response184 @aws_api_caller()185 def cancel_spot_fleet_request(self, spot_fleet_request_id):186 # Call API187 response = self.ec2_client().cancel_spot_fleet_requests(SpotFleetRequestIds=[spot_fleet_request_id],188 TerminateInstances=True)189 self._check_status_code(response)190 # TODO: check the response to make sure request was canceled191 return True192 def ec2_client(self):193 if self._ec2_client is None:194 self._ec2_client = boto3.client('ec2',195 self._region,196 aws_access_key_id=self._access_key,197 aws_secret_access_key=self._secret_key)198 return self._ec2_client199 def sts_client(self):200 if self._sts_client is None:201 self._sts_client = boto3.client('sts',202 aws_access_key_id=self._access_key,203 aws_secret_access_key=self._secret_key)204 return self._sts_client205 @staticmethod206 def _as_list(x):207 """208 Ensure that argument is a list209 :param x: Individual element or list210 :return: List211 :rtype: list212 """213 return x if type(x) == list else [x]214 @staticmethod215 def _check_status_code(response):216 status_code = response['ResponseMetadata']['HTTPStatusCode']217 if status_code != 200:...

Full Screen

Full Screen

bunny-stream.py

Source:bunny-stream.py Github

copy

Full Screen

...9 self._api_key = api_key10 self._headers = {"AccessKey": api_key, "Content-Type": "application/json"}11 def _generate_base_url(self, endpoint):12 return self.base_url + self.stream_library_id + endpoint13 def _check_status_code(self, status):14 """15 Helper to check response codes.16 """17 if status == 401:18 raise Exception("Unauthorized; check API key.")19 if status == 404:20 raise Exception("Not found.")21 def get_video(self, video_id):22 """23 Get specific video information.24 """25 url = self._generate_base_url("/videos/" + video_id)26 response = requests.get(url, headers=self._headers)27 self._check_status_code(response.status_code)28 try:29 return json.loads(response.text)30 except:31 raise Exception("Failed to retrieve video")32 def list_videos(self, page = 1, per_page = 10, sort_by = "date", search = None, collection_id = None):33 """34 Lists videos in a library.35 """36 url = self._generate_base_url("/videos") + "?page=" + str(page) + "&per_page=" + str(per_page) + "&sort_by=" + sort_by37 if search is not None:38 url += "&search=" + search39 if collection_id is not None:40 url += "&collection=" + collection_id41 response = requests.get(url, headers=self._headers)42 self._check_status_code(response.status_code)43 try:44 return json.loads(response.text)45 except:46 raise Exception("Failed to retrieve list of videos")47 def update_video(self, video_id, title = None, collection_id = None):48 """49 Updates video title or collection.50 """51 url = self._generate_base_url("/videos/" + video_id)52 payload = {}53 if title is not None:54 payload["title"] = title55 if collection_id is not None:56 payload["collectionId"] = collection_id 57 response = requests.post(url, data=json.dumps(payload), headers=self._headers)58 self._check_status_code(response.status_code)59 try:60 return json.loads(response.text)61 except:62 raise Exception("Failed to update video")63 def delete_video(self, video_id):64 """65 Delete video by ID.66 """67 url = self._generate_base_url("/videos/" + video_id)68 response = requests.delete(url, headers=self._headers)69 self._check_status_code(response.status_code)70 try:71 return json.loads(response.text)72 except:73 raise Exception("Failed to delete video")74 def create_video(self, title, collection_id = None):75 """76 Create video. (must be done before calling upload_video_with_id())77 """78 url = self._generate_base_url("/videos")79 payload = {"title": title}80 if collection_id is not None:81 payload["collectionId"] = collection_id82 response = requests.post(url, data=json.dumps(payload), headers=self._headers)83 self._check_status_code(response.status_code)84 try:85 return json.loads(response.text)86 except:87 raise Exception("Failed to create video")88 def upload_video_with_id(self, video_id, path):89 """90 Uploads video to video_id.91 """92 url = self._generate_base_url("/videos/" + video_id)93 response = requests.put(url, data=open(path, "rb"), headers=self._headers)94 self._check_status_code(response.status_code)95 try:96 return json.loads(response.text)97 except:98 raise Exception("Failed to upload video")99 def upload_video(self, title, path, collection_id = None):100 """101 Creates video object and uploads video.102 """103 resp = self.create_video(title)104 try:105 return self.upload_video_with_id(resp["guid"], path)106 except:107 raise Exception("Failed to upload video")108 def set_video_thumbnail(self, video_id, thumbnail_url):109 """110 Set video thumbnail from URL.111 """112 url = self._generate_base_url("/videos/" + video_id + "/thumbnail")113 payload = {"thumbnailUrl": thumbnail_url}114 response = requests.post(url, data=json.dumps(payload), headers=self._headers)115 self._check_status_code(response.status_code)116 try:117 return json.loads(response.text)118 except:119 raise Exception("Failed to set video thumbnail")120 def fetch_video(self, video_id, source, headers = None):121 """122 Download video from external source; headers = dict{}.123 """124 url = self._generate_base_url("/videos/" + video_id + "/fetch")125 payload = {"thumbnailUrl": thumbnail_url}126 if headers is not None:127 payload["headers"] = headers128 response = requests.post(url, data=json.dumps(payload), headers=self._headers)129 self._check_status_code(response.status_code)130 try:131 return json.loads(response.text)132 except:133 raise Exception("Failed to fetch video")134 def add_video_captions(self, video_id, language, path, label):135 """136 Add captions.137 """138 url = self._generate_base_url("/videos/" + video_id + "/captions/" + language)139 payload = {"captionsFile": base64.b64encode(open(path, "rb").read()).decode("ascii"), "srclang": language, "label": label}140 response = requests.post(url, data=json.dumps(payload), headers=self._headers)141 self._check_status_code(response.status_code)142 try:143 return json.loads(response.text)144 except:145 raise Exception("Failed to add captions")146 def delete_video_captions(self, video_id, language):147 """148 Delete captions.149 """150 url = self._generate_base_url("/videos/" + video_id + "/captions/" + language)151 response = requests.delete(url, headers=self._headers)152 self._check_status_code(response.status_code)153 try:154 return json.loads(response.text)155 except:...

Full Screen

Full Screen

oozie.py

Source:oozie.py Github

copy

Full Screen

...28 session = self._get_http_session(job_execution.extra.get('neutron'))29 resp = session.post(self.jobs_url, data=job_config, headers={30 "Content-Type": "application/xml;charset=UTF-8"31 })32 _check_status_code(resp, 201)33 return get_json(resp)['id']34 def run_job(self, job_execution, job_id):35 session = self._get_http_session(job_execution.extra.get('neutron'))36 resp = session.put(self.job_url % job_id + "?action=start")37 _check_status_code(resp, 200)38 def kill_job(self, job_execution):39 session = self._get_http_session(job_execution.extra.get('neutron'))40 resp = session.put(self.job_url % job_execution.oozie_job_id +41 "?action=kill")42 _check_status_code(resp, 200)43 def get_job_status(self, job_execution, job_id=None):44 if job_id is None:45 job_id = job_execution.oozie_job_id46 session = self._get_http_session(job_execution.extra.get('neutron'))47 resp = session.get(self.job_url % job_id + "?show=info")48 _check_status_code(resp, 200)49 return get_json(resp)50 def get_job_logs(self, job_execution):51 session = self._get_http_session(job_execution.extra.get('neutron'))52 resp = session.get(self.job_url % job_execution.oozie_job_id +53 "?show=log")54 _check_status_code(resp, 200)55 return resp.text56 def get_jobs(self, offset, size, **filter):57 url = self.jobs_url + "?offset=%s&len=%s" % (offset, size)58 if len(filter) > 0:59 f = ";".join([k + "=" + v for k, v in filter.items()])60 url += "&filter=" + urlparse.quote(f)61 session = self._get_http_session()62 resp = session.get(url)63 _check_status_code(resp, 200)64 return get_json(resp)65def _check_status_code(resp, expected_code):66 if resp.status_code != expected_code:67 resp_text = resp.text68 # cleaning tomcat error message69 message = resp_text.split("<HR size=\"1\" noshade=\"noshade\">")[1]70 message = message.replace("</p><p>", "\n")71 message = re.sub('<[^<]+?>', ' ', message)72 raise OozieException(message)73def get_json(response):74 """Provides backward compatibility for old versions of requests library."""75 json_field_or_function = getattr(response, 'json', None)76 if callable(json_field_or_function):77 return response.json()78 else:79 return json.loads(response.content)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tavern automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful