How to use is_resource_deleted method in tempest

Best Python code snippet using tempest_python

clients.py

Source:clients.py Github

copy

Full Screen

...22from swiftclient import client as swift_client23from swiftclient import exceptions as swift_exc24from tempest_lib import exceptions as exc25class Client(object):26 def is_resource_deleted(self, method, *args, **kwargs):27 raise NotImplementedError28 def delete_resource(self, method, *args, **kwargs):29 # TODO(sreshetniak): make timeout configurable30 with fixtures.Timeout(300, gentle=True):31 while True:32 if self.is_resource_deleted(method, *args, **kwargs):33 break34 time.sleep(5)35class SaharaClient(Client):36 def __init__(self, *args, **kwargs):37 self.sahara_client = sahara_client.Client('1.1', *args, **kwargs)38 def create_node_group_template(self, *args, **kwargs):39 data = self.sahara_client.node_group_templates.create(*args, **kwargs)40 return data.id41 def delete_node_group_template(self, node_group_template_id):42 return self.delete_resource(43 self.sahara_client.node_group_templates.delete,44 node_group_template_id)45 def create_cluster_template(self, *args, **kwargs):46 data = self.sahara_client.cluster_templates.create(*args, **kwargs)47 return data.id48 def delete_cluster_template(self, cluster_template_id):49 return self.delete_resource(50 self.sahara_client.cluster_templates.delete,51 cluster_template_id)52 def create_cluster(self, *args, **kwargs):53 data = self.sahara_client.clusters.create(*args, **kwargs)54 return data.id55 def delete_cluster(self, cluster_id):56 return self.delete_resource(57 self.sahara_client.clusters.delete,58 cluster_id)59 def scale_cluster(self, cluster_id, body):60 return self.sahara_client.clusters.scale(cluster_id, body)61 def create_datasource(self, *args, **kwargs):62 data = self.sahara_client.data_sources.create(*args, **kwargs)63 return data.id64 def delete_datasource(self, datasource_id):65 return self.delete_resource(66 self.sahara_client.data_sources.delete,67 datasource_id)68 def create_job_binary_internal(self, *args, **kwargs):69 data = self.sahara_client.job_binary_internals.create(*args, **kwargs)70 return data.id71 def delete_job_binary_internal(self, job_binary_internal_id):72 return self.delete_resource(73 self.sahara_client.job_binary_internals.delete,74 job_binary_internal_id)75 def create_job_binary(self, *args, **kwargs):76 data = self.sahara_client.job_binaries.create(*args, **kwargs)77 return data.id78 def delete_job_binary(self, job_binary_id):79 return self.delete_resource(80 self.sahara_client.job_binaries.delete,81 job_binary_id)82 def create_job(self, *args, **kwargs):83 data = self.sahara_client.jobs.create(*args, **kwargs)84 return data.id85 def delete_job(self, job_id):86 return self.delete_resource(87 self.sahara_client.jobs.delete,88 job_id)89 def run_job(self, *args, **kwargs):90 data = self.sahara_client.job_executions.create(*args, **kwargs)91 return data.id92 def delete_job_execution(self, job_execution_id):93 return self.delete_resource(94 self.sahara_client.job_executions.delete,95 job_execution_id)96 def get_cluster_status(self, cluster_id):97 data = self.sahara_client.clusters.get(cluster_id)98 return str(data.status)99 def get_job_status(self, exec_id):100 data = self.sahara_client.job_executions.get(exec_id)101 return str(data.info['status'])102 def is_resource_deleted(self, method, *args, **kwargs):103 try:104 method(*args, **kwargs)105 except saharaclient_base.APIException as ex:106 return ex.error_code == 404107 return False108class NovaClient(Client):109 def __init__(self, *args, **kwargs):110 self.nova_client = nova_client.Client('1.1', *args, **kwargs)111 def get_image_id(self, image_name):112 if uuidutils.is_uuid_like(image_name):113 return image_name114 for image in self.nova_client.images.list():115 if image.name == image_name:116 return image.id117 raise exc.NotFound(image_name)118class NeutronClient(Client):119 def __init__(self, *args, **kwargs):120 self.neutron_client = neutron_client.Client('2.0', *args, **kwargs)121 def get_network_id(self, network_name):122 if uuidutils.is_uuid_like(network_name):123 return network_name124 networks = self.neutron_client.list_networks(name=network_name)125 networks = networks['networks']126 if len(networks) < 1:127 raise exc.NotFound(network_name)128 return networks[0]['id']129class SwiftClient(Client):130 def __init__(self, *args, **kwargs):131 self.swift_client = swift_client.Connection(auth_version='2.0',132 *args, **kwargs)133 def create_container(self, container_name):134 return self.swift_client.put_container(container_name)135 def delete_container(self, container_name):136 return self.delete_resource(137 self.swift_client.delete_container,138 container_name)139 def upload_data(self, container_name, object_name, data):140 return self.swift_client.put_object(container_name, object_name, data)141 def delete_object(self, container_name, object_name):142 return self.delete_resource(143 self.swift_client.delete_object,144 container_name,145 object_name)146 def is_resource_deleted(self, method, *args, **kwargs):147 try:148 method(*args, **kwargs)149 except swift_exc.ClientException as ex:150 return ex.http_status == 404...

Full Screen

Full Screen

sfc_client.py

Source:sfc_client.py Github

copy

Full Screen

...30 return self.delete_resource(uri)31 def list_port_chains(self, **filters):32 uri = '/sfc/port_chains'33 return self.list_resources(uri, **filters)34 def is_resource_deleted(self, id):35 try:36 self.show_port_chain(id)37 except lib_exc.NotFound:38 return True39 return False40 @property41 def resource_type(self):42 """Returns the primary type of resource this client works with."""43 return 'sfc'44class PortPairGroupClient(base.BaseNetworkClient):45 def create_port_pair_group(self, **kwargs):46 uri = '/sfc/port_pair_groups'47 post_data = {'port_pair_group': kwargs}48 return self.create_resource(uri, post_data)49 def update_port_pair_group(self, pg_id, **kwargs):50 uri = '/sfc/port_pair_groups/%s' % pg_id51 post_data = {'port_pair_group': kwargs}52 return self.update_resource(uri, post_data)53 def show_port_pair_group(self, pg_id, **fields):54 uri = '/sfc/port_pair_groups/%s' % pg_id55 return self.show_resource(uri, **fields)56 def delete_port_pair_group(self, pg_id):57 uri = '/sfc/port_pair_groups/%s' % pg_id58 return self.delete_resource(uri)59 def list_port_pair_groups(self, **filters):60 uri = '/sfc/port_pair_groups'61 return self.list_resources(uri, **filters)62 def is_resource_deleted(self, id):63 try:64 self.show_port_pair_group(id)65 except lib_exc.NotFound:66 return True67 return False68 @property69 def resource_type(self):70 """Returns the primary type of resource this client works with."""71 return 'sfc'72class PortPairClient(base.BaseNetworkClient):73 def create_port_pair(self, **kwargs):74 uri = '/sfc/port_pairs'75 post_data = {'port_pair': kwargs}76 return self.create_resource(uri, post_data)77 def update_port_pair(self, pp_id, **kwargs):78 uri = '/sfc/port_pairs/%s' % pp_id79 post_data = {'port_pair': kwargs}80 return self.update_resource(uri, post_data)81 def show_port_pair(self, pp_id, **fields):82 uri = '/sfc/port_pairs/%s' % pp_id83 return self.show_resource(uri, **fields)84 def delete_port_pair(self, pp_id):85 uri = '/sfc/port_pairs/%s' % pp_id86 return self.delete_resource(uri)87 def list_port_pairs(self, **filters):88 uri = '/sfc/port_pairs'89 return self.list_resources(uri, **filters)90 def is_resource_deleted(self, id):91 try:92 self.show_port_pair(id)93 except lib_exc.NotFound:94 return True95 return False96 @property97 def resource_type(self):98 """Returns the primary type of resource this client works with."""...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful