Best Python code snippet using tavern
aws_client.py
Source:aws_client.py  
...26	@aws_api_caller()27	def get_user_identity(self):28		# Call API29		response = self.sts_client().get_caller_identity()30		self._check_status_code(response)31		return response32	@aws_api_caller()33	def get_availability_zones(self):34		# Call API35		response = self.ec2_client().describe_availability_zones()36		self._check_status_code(response)37		try:38			zones = [zone['ZoneName'] for zone in response['AvailabilityZones'] if zone['State'] == 'available']39		except KeyError as e:40			raise ProviderRequestError('Response from AWS has unexpected format: {}.'.format(e.message))41		return zones42	@aws_api_caller()43	def get_subnets(self, availability_zone):44		# Define filters45		filters = [{'Name': 'availability-zone', 'Values': [availability_zone]},46				   {'Name': 'default-for-az', 'Values': ['true']}]47		# Call API48		response = self.ec2_client().describe_subnets(Filters=filters)49		self._check_status_code(response)50		try:51			subnets = response['Subnets']52		except KeyError as e:53			raise ProviderRequestError('Response from AWS has unexpected format: {}.'.format(e.message))54		return subnets55	@aws_api_caller()56	def find_spot_instance(self, portal_name, user):57		# Define filters58		filters = [{'Name': 'tag:portal-name', 'Values': [portal_name]},59				   {'Name': 'tag:created-by', 'Values': [user]},60				   {'Name': 'instance-state-name', 'Values': ['running', 'pending']}]61		# Call API62		response = self.ec2_client().describe_instances(Filters=filters)63		self._check_status_code(response)64		if len(response['Reservations']) == 0 or len(response['Reservations'][0]['Instances']) == 0:65			return None66		return response['Reservations'][0]['Instances'][0]67	@aws_api_caller()68	def get_instance(self, instance_id):69		# Call API70		response = self.ec2_client().describe_instances(InstanceIds=[instance_id])71		self._check_status_code(response)72		if len(response['Reservations']) == 0 or len(response['Reservations'][0]['Instances']) == 0:73			return None74		return response['Reservations'][0]['Instances'][0]75	@aws_api_caller()76	def get_spot_fleet_instances(self, spot_fleet_request_id):77		# Call API78		response = self.ec2_client().describe_spot_fleet_instances(SpotFleetRequestId=spot_fleet_request_id)79		self._check_status_code(response)80		return response['ActiveInstances']81	@aws_api_caller()82	def get_spot_fleet_request(self, spot_fleet_request_id):83		# Call API84		response = self.ec2_client().describe_spot_fleet_requests(SpotFleetRequestIds=[spot_fleet_request_id])85		self._check_status_code(response)86		if len(response['SpotFleetRequestConfigs']) == 0:87			return None88		return response['SpotFleetRequestConfigs'][0]89	@aws_api_caller()90	def get_volumes_by_id(self, volume_ids):91		"""92		:param volume_ids: One or several volume Ids93		:type volume_ids: string or list94		:return:95		"""96		# Call API97		response = self.ec2_client().describe_volumes(VolumeIds=AwsClient._as_list(volume_ids))98		self._check_status_code(response)99		return response['Volumes']100	@aws_api_caller()101	def get_volumes(self, filters=None):102		if filters is None:103			filters = {}104		# Convert list of filters to the expected format105		aws_filters = [{'Name': k, 'Values': AwsClient._as_list(v)} for k, v in filters.items()]106		# Call API107		response = self.ec2_client().describe_volumes(Filters=aws_filters)108		self._check_status_code(response)109		return response['Volumes']110	@aws_api_caller()111	def create_volume(self, size, availability_zone, tags=None, snapshot_id=None):112		if tags is None:113			tags = {}114		if snapshot_id is None:115			snapshot_id = ''116		# Convert tags to the expected format117		aws_tags = to_aws_tags(tags)118		# Call API119		response = self.ec2_client().create_volume(AvailabilityZone=availability_zone,120												   Size=size,121												   VolumeType='gp2',122												   SnapshotId=snapshot_id,123												   TagSpecifications=[{'ResourceType': 'volume', 'Tags': aws_tags}])124		self._check_status_code(response)125		return response['VolumeId']126	@aws_api_caller()127	def update_volume(self, volume_id, size):128		# Call API129		response = self.ec2_client().modify_volume(VolumeId=volume_id,130												   Size=size)131		self._check_status_code(response)132		return response133	@aws_api_caller()134	def attach_volume(self, instance_id, volume_id, device):135		# Call API136		response = self.ec2_client().attach_volume(InstanceId=instance_id,137												   VolumeId=volume_id,138												   Device=device)139		self._check_status_code(response)140		return response141	@aws_api_caller()142	def delete_volume(self, volume_id):143		# Call API144		response = self.ec2_client().delete_volume(VolumeId=volume_id)145		self._check_status_code(response)146		return response147	@aws_api_caller()148	def add_tags(self, resource_ids, tags):149		"""150		Add or overwrite tags for an EC2 resource (e.g. an instance or a volume).151		:param resource_ids: One or several resources to be affected152		:param tags: Dictionary of tags153		:type resource_ids: string or list154		:type tags: dict155		:return:156		"""157		# Convert tags to the expected format158		aws_tags = to_aws_tags(tags)159		# Call API160		response = self.ec2_client().create_tags(Resources=AwsClient._as_list(resource_ids), Tags=aws_tags)161		self._check_status_code(response)162		return True163	@aws_api_caller()164	def remove_tags(self, resource_ids, keys):165		"""166		Remove tags for an EC2 resource (e.g. an instance or a volume).167		:param resource_ids: One or several resources to be affected168		:param keys: One or several tag keys to be removed169		:type resource_ids: string or list170		:type keys: string or list171		:return:172		"""173		aws_tags = [{'Key': key} for key in AwsClient._as_list(keys)]174		# Call API175		response = self.ec2_client().delete_tags(Resources=AwsClient._as_list(resource_ids), Tags=aws_tags)176		self._check_status_code(response)177		return True178	@aws_api_caller()179	def request_spot_fleet(self, config):180		# Call API181		response = self.ec2_client().request_spot_fleet(SpotFleetRequestConfig=config)182		self._check_status_code(response)183		return response184	@aws_api_caller()185	def cancel_spot_fleet_request(self, spot_fleet_request_id):186		# Call API187		response = self.ec2_client().cancel_spot_fleet_requests(SpotFleetRequestIds=[spot_fleet_request_id],188																TerminateInstances=True)189		self._check_status_code(response)190		# TODO: check the response to make sure request was canceled191		return True192	def ec2_client(self):193		if self._ec2_client is None:194			self._ec2_client = boto3.client('ec2',195											self._region,196											aws_access_key_id=self._access_key,197											aws_secret_access_key=self._secret_key)198		return self._ec2_client199	def sts_client(self):200		if self._sts_client is None:201			self._sts_client = boto3.client('sts',202											aws_access_key_id=self._access_key,203											aws_secret_access_key=self._secret_key)204		return self._sts_client205	@staticmethod206	def _as_list(x):207		"""208		Ensure that argument is a list209		:param x: Individual element or list210		:return: List211		:rtype: list212		"""213		return x if type(x) == list else [x]214	@staticmethod215	def _check_status_code(response):216		status_code = response['ResponseMetadata']['HTTPStatusCode']217		if status_code != 200:...bunny-stream.py
Source:bunny-stream.py  
...9		self._api_key = api_key10		self._headers = {"AccessKey": api_key, "Content-Type": "application/json"}11	def _generate_base_url(self, endpoint):12		return self.base_url + self.stream_library_id + endpoint13	def _check_status_code(self, status):14		"""15			Helper to check response codes.16		"""17		if status == 401:18			raise Exception("Unauthorized; check API key.")19		if status == 404:20			raise Exception("Not found.")21	def get_video(self, video_id):22		"""23			Get specific video information.24		"""25		url = self._generate_base_url("/videos/" + video_id)26		response = requests.get(url, headers=self._headers)27		self._check_status_code(response.status_code)28		try:29			return json.loads(response.text)30		except:31			raise Exception("Failed to retrieve video")32	def list_videos(self, page = 1, per_page = 10, sort_by = "date", search = None, collection_id = None):33		"""34			Lists videos in a library.35		"""36		url = self._generate_base_url("/videos") + "?page=" + str(page) + "&per_page=" + str(per_page) + "&sort_by=" + sort_by37		if search is not None:38			url += "&search=" + search39		if collection_id is not None:40			url += "&collection=" + collection_id41		response = requests.get(url, headers=self._headers)42		self._check_status_code(response.status_code)43		try:44			return json.loads(response.text)45		except:46			raise Exception("Failed to retrieve list of videos")47	def update_video(self, video_id, title = None, collection_id = None):48		"""49			Updates video title or collection.50		"""51		url = self._generate_base_url("/videos/" + video_id)52		payload = {}53		if title is not None:54			payload["title"] = title55		if collection_id is not None:56			payload["collectionId"] = collection_id	 57		response = requests.post(url, data=json.dumps(payload), headers=self._headers)58		self._check_status_code(response.status_code)59		try:60			return json.loads(response.text)61		except:62			raise Exception("Failed to update video")63	def delete_video(self, video_id):64		"""65			Delete video by ID.66		"""67		url = self._generate_base_url("/videos/" + video_id)68		response = requests.delete(url, headers=self._headers)69		self._check_status_code(response.status_code)70		try:71			return json.loads(response.text)72		except:73			raise Exception("Failed to delete video")74	def create_video(self, title, collection_id = None):75		"""76			Create video. (must be done before calling upload_video_with_id())77		"""78		url = self._generate_base_url("/videos")79		payload = {"title": title}80		if collection_id is not None:81			payload["collectionId"] = collection_id82		response = requests.post(url, data=json.dumps(payload), headers=self._headers)83		self._check_status_code(response.status_code)84		try:85			return json.loads(response.text)86		except:87			raise Exception("Failed to create video")88	def upload_video_with_id(self, video_id, path):89		"""90			Uploads video to video_id.91		"""92		url = self._generate_base_url("/videos/" + video_id)93		response = requests.put(url, data=open(path, "rb"), headers=self._headers)94		self._check_status_code(response.status_code)95		try:96			return json.loads(response.text)97		except:98			raise Exception("Failed to upload video")99	def upload_video(self, title, path, collection_id = None):100		"""101			Creates video object and uploads video.102		"""103		resp = self.create_video(title)104		try:105			return self.upload_video_with_id(resp["guid"], path)106		except:107			raise Exception("Failed to upload video")108	def set_video_thumbnail(self, video_id, thumbnail_url):109		"""110			Set video thumbnail from URL.111		"""112		url = self._generate_base_url("/videos/" + video_id + "/thumbnail")113		payload = {"thumbnailUrl": thumbnail_url}114		response = requests.post(url, data=json.dumps(payload), headers=self._headers)115		self._check_status_code(response.status_code)116		try:117			return json.loads(response.text)118		except:119			raise Exception("Failed to set video thumbnail")120	def fetch_video(self, video_id, source, headers = None):121		"""122			Download video from external source; headers = dict{}.123		"""124		url = self._generate_base_url("/videos/" + video_id + "/fetch")125		payload = {"thumbnailUrl": thumbnail_url}126		if headers is not None:127			payload["headers"] = headers128		response = requests.post(url, data=json.dumps(payload), headers=self._headers)129		self._check_status_code(response.status_code)130		try:131			return json.loads(response.text)132		except:133			raise Exception("Failed to fetch video")134	def add_video_captions(self, video_id, language, path, label):135		"""136			Add captions.137		"""138		url = self._generate_base_url("/videos/" + video_id + "/captions/" + language)139		payload = {"captionsFile": base64.b64encode(open(path, "rb").read()).decode("ascii"), "srclang": language, "label": label}140		response = requests.post(url, data=json.dumps(payload), headers=self._headers)141		self._check_status_code(response.status_code)142		try:143			return json.loads(response.text)144		except:145			raise Exception("Failed to add captions")146	def delete_video_captions(self, video_id, language):147		"""148			Delete captions.149		"""150		url = self._generate_base_url("/videos/" + video_id + "/captions/" + language)151		response = requests.delete(url, headers=self._headers)152		self._check_status_code(response.status_code)153		try:154			return json.loads(response.text)155		except:...oozie.py
Source:oozie.py  
...28        session = self._get_http_session(job_execution.extra.get('neutron'))29        resp = session.post(self.jobs_url, data=job_config, headers={30            "Content-Type": "application/xml;charset=UTF-8"31        })32        _check_status_code(resp, 201)33        return get_json(resp)['id']34    def run_job(self, job_execution, job_id):35        session = self._get_http_session(job_execution.extra.get('neutron'))36        resp = session.put(self.job_url % job_id + "?action=start")37        _check_status_code(resp, 200)38    def kill_job(self, job_execution):39        session = self._get_http_session(job_execution.extra.get('neutron'))40        resp = session.put(self.job_url % job_execution.oozie_job_id +41                           "?action=kill")42        _check_status_code(resp, 200)43    def get_job_status(self, job_execution, job_id=None):44        if job_id is None:45            job_id = job_execution.oozie_job_id46        session = self._get_http_session(job_execution.extra.get('neutron'))47        resp = session.get(self.job_url % job_id + "?show=info")48        _check_status_code(resp, 200)49        return get_json(resp)50    def get_job_logs(self, job_execution):51        session = self._get_http_session(job_execution.extra.get('neutron'))52        resp = session.get(self.job_url % job_execution.oozie_job_id +53                           "?show=log")54        _check_status_code(resp, 200)55        return resp.text56    def get_jobs(self, offset, size, **filter):57        url = self.jobs_url + "?offset=%s&len=%s" % (offset, size)58        if len(filter) > 0:59            f = ";".join([k + "=" + v for k, v in filter.items()])60            url += "&filter=" + urlparse.quote(f)61        session = self._get_http_session()62        resp = session.get(url)63        _check_status_code(resp, 200)64        return get_json(resp)65def _check_status_code(resp, expected_code):66    if resp.status_code != expected_code:67        resp_text = resp.text68        # cleaning tomcat error message69        message = resp_text.split("<HR size=\"1\" noshade=\"noshade\">")[1]70        message = message.replace("</p><p>", "\n")71        message = re.sub('<[^<]+?>', ' ', message)72        raise OozieException(message)73def get_json(response):74    """Provides backward compatibility for old versions of requests library."""75    json_field_or_function = getattr(response, 'json', None)76    if callable(json_field_or_function):77        return response.json()78    else:79        return json.loads(response.content)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
