How to use get_compute_client method in lisa

Best Python code snippet using lisa_python

test_clash.py

Source:test_clash.py Github

copy

Full Screen

...61 self.subscriber.subscription_path.side_effect = (62 lambda project, name: "{}/{}".format(project, name)63 )64 self.logging_client = MagicMock()65 def get_compute_client(self):66 return self.compute67 def get_publisher(self):68 return self.publisher69 def get_subscriber(self):70 return self.subscriber71 def get_logging(self, project=None):72 return self.logging_client73class TestMachineConfig:74 def setup(self):75 self.gcloud = CloudSdkStub()76 self.cloud_init = clash.CloudInitConfig("_", "", TEST_JOB_CONFIG)77 def test_config_contains_vmname(self):78 manifest = clash.MachineConfig(79 self.gcloud.get_compute_client(), "myvm", self.cloud_init, TEST_JOB_CONFIG80 )81 machine_config = manifest.to_dict()82 assert machine_config["name"] == "myvm"83 def test_config_contains_cloud_init_config(self):84 config = clash.MachineConfig(85 self.gcloud.get_compute_client(),86 "_",87 clash.CloudInitConfig("myname", "_", TEST_JOB_CONFIG),88 TEST_JOB_CONFIG,89 )90 machine_config = config.to_dict()91 assert machine_config["metadata"]["items"][0]["key"] == "user-data"92 cloud_init = yaml.safe_load(machine_config["metadata"]["items"][0]["value"])93 assert cloud_init["users"][0]["name"] == "clash"94 def test_config_contains_machine_type(self):95 manifest = clash.MachineConfig(96 self.gcloud.get_compute_client(), "_", self.cloud_init, TEST_JOB_CONFIG97 )98 machine_config = manifest.to_dict()99 assert machine_config["machineType"] == "n1-standard-1"100 def test_automatic_restart_is_always_false(self):101 manifest = clash.MachineConfig(102 self.gcloud.get_compute_client(), "_", self.cloud_init, TEST_JOB_CONFIG103 )104 machine_config = manifest.to_dict()105 assert not machine_config["scheduling"]["automaticRestart"]106 def test_config_contains_preemptible_flag_if_set_true(self):107 job_config = copy.deepcopy(TEST_JOB_CONFIG)108 job_config["preemptible"] = True109 manifest = clash.MachineConfig(110 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config111 )112 machine_config = manifest.to_dict()113 assert machine_config["scheduling"]["preemptible"]114 def test_config_contains_preemptible_flag_if_set_false(self):115 job_config = copy.deepcopy(TEST_JOB_CONFIG)116 job_config["preemptible"] = False117 manifest = clash.MachineConfig(118 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config119 )120 machine_config = manifest.to_dict()121 assert not machine_config["scheduling"]["preemptible"]122 def test_config_contains_custom_service_account(self):123 job_config = copy.deepcopy(TEST_JOB_CONFIG)124 job_config["service_account"] = "myaccount@foo.bar"125 manifest = clash.MachineConfig(126 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config127 )128 machine_config = manifest.to_dict()129 assert machine_config["serviceAccounts"][0]["email"] == "myaccount@foo.bar"130 def test_config_contains_default_service_account_by_default(self):131 job_config = copy.deepcopy(TEST_JOB_CONFIG)132 manifest = clash.MachineConfig(133 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config134 )135 machine_config = manifest.to_dict()136 assert machine_config["serviceAccounts"][0]["email"] == "default"137 def test_config_contains_labels(self):138 job_config = copy.deepcopy(TEST_JOB_CONFIG)139 job_config["labels"] = {"customer": "dummy"}140 manifest = clash.MachineConfig(141 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config142 )143 machine_config = manifest.to_dict()144 assert machine_config["labels"] == {"customer": "dummy"}145 def test_config_empty_labels(self):146 job_config = copy.deepcopy(TEST_JOB_CONFIG)147 manifest = clash.MachineConfig(148 self.gcloud.get_compute_client(), "_", self.cloud_init, job_config149 )150 machine_config = manifest.to_dict()151 assert machine_config["labels"] == {}152def test_argument_to_script_with_whitespace():153 res = clash.translate_args_to_script(args=["echo", "hello world"])154 assert res == "echo 'hello world'"155def test_argument_to_script():156 res = clash.translate_args_to_script(args=["echo", "world"])157 assert res == "echo world"158class TestJob:159 def setup(self):160 self.gcloud = CloudSdkStub()161 @patch("uuid.uuid1")162 def test_creates_job(self, mock_uuid_call):163 mock_uuid_call.return_value = 1234164 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)165 assert "clash-job-1234" == job.name166 @patch("uuid.uuid1")167 def test_creates_job_with_name_prefix(self, mock_uuid_call):168 mock_uuid_call.return_value = 1234169 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud, name_prefix="foo")170 assert "foo-clash-job-1234" == job.name171 def test_running_a_job_creates_an_instance_template(self):172 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)173 job.run(args=[])174 self.gcloud.get_compute_client().instanceTemplates.return_value.insert.return_value.execute.assert_called()175 def test_running_a_job_creates_a_managed_instance_group(self):176 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)177 job.run(args=[])178 self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.assert_called()179 def test_deletes_instance_template_after_job_is_complete(self):180 self.gcloud.get_compute_client().instanceGroups.return_value.list.return_value.execute.return_value = {181 "items": [{"name": "anothergroup"}]182 }183 with clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud) as job:184 job.run(args=[])185 self.gcloud.get_compute_client().instanceTemplates.return_value.delete.return_value.execute.assert_called()186 def test_removes_subscription_if_job_creation_failed(self):187 self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.side_effect = Exception(188 "Failure!"189 )190 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)191 with pytest.raises(Exception) as e_info:192 job.run(args=[])193 self.gcloud.get_subscriber().delete_subscription.assert_called_with(194 f"{TEST_JOB_CONFIG['project_id']}/{job.name}"195 )196 def test_removes_topic_if_job_creation_failed(self):197 self.gcloud.get_compute_client().instanceGroupManagers.return_value.insert.return_value.execute.side_effect = Exception(198 "Failure!"199 )200 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)201 with pytest.raises(Exception) as e_info:202 job.run(args=[])203 self.gcloud.get_publisher().delete_topic.assert_called_with(204 f"{TEST_JOB_CONFIG['project_id']}/{job.name}"205 )206 def test_running_a_job_creates_a_topic_path(self):207 job = clash.Job(TEST_JOB_CONFIG, gcloud=self.gcloud)208 job.run(args=[])209 self.gcloud.get_publisher().topic_path.assert_called_with(210 TEST_JOB_CONFIG["project_id"], job.name211 )...

Full Screen

Full Screen

compute.py

Source:compute.py Github

copy

Full Screen

...25from oslo_log import log as logging26LOG = logging.getLogger(__name__)27bp = Blueprint('compute', __name__)28CONF = cfg.CONF29def get_compute_client(req):30 try:31 loader = loading.get_plugin_loader('v3token')32 auth = loader.load_from_options(33 auth_url=CONF.keystone_authtoken.auth_url,34 token=req.headers.get('X-Auth-Token'),35 project_name=CONF.keystone_authtoken.project_name,36 project_domain_name=CONF.keystone_authtoken.project_domain_name37 )38 sess = session.Session(auth=auth,39 verify=not CONF.keystone_authtoken.insecure)40 compute_client = novaClient.Client(41 # api version for live_migrate with block_migration='auto'42 '2.25',43 endpoint_type="internalURL",44 session=sess45 )46 return compute_client47 except Exception as e:48 LOG.error(e)49 abort(500, 'Failed to get compute novaclient')50def complete_with_errors_response(msg, contents):51 response = jsonify({'error_msg': msg, 'contents': contents})52 response.status_code = 50053 return response54@bp.route("/api/v2/compute/services/<hostname>", methods=['GET'])55@policy.enforce('lifecycle:get_compute')56def compute_services_status(hostname):57 """Get the compute services status for a compute host58 .. :quickref: Compute; Get the compute services status59 **Example Request**:60 .. sourcecode:: http61 GET /api/v2/compute/services/<hostname> HTTP/1.162 Content-Type: application/json63 **Example Response**:64 .. sourcecode:: http65 HTTP/1.1 200 OK66 {67 "nova-compute": "enabled"68 }69 """70 # mock for getting nova service status for a compute host71 if cfg.CONF.testing.use_mock:72 mock_json = "tools/compute-mock-data.json"73 json_file = os.path.join(74 os.path.dirname(os.path.dirname(__file__)), mock_json)75 with open(json_file) as f:76 return jsonify(json.load(f)['compute_services_status'])77 compute_client = get_compute_client(request)78 compute_services = compute_client.services.list(host=hostname)79 if len(compute_services) == 0:80 msg = 'No compute service for %s' % hostname81 LOG.error(msg)82 abort(410, msg)83 services = dict()84 for service in compute_services:85 binary = getattr(service, 'binary', None)86 if binary:87 services[binary] = \88 getattr(service, 'status', None) == 'enabled'89 return jsonify(services)90@bp.route("/api/v2/compute/services/<hostname>/disable", methods=['PUT'])91@policy.enforce('lifecycle:update_compute')92def compute_disable_services(hostname):93 """Disable the compute services for a compute host94 .. :quickref: Compute; Disable the compute services95 **Example Request**:96 .. sourcecode:: http97 PUT /api/v2/compute/services/<hostname>/disable HTTP/1.198 Content-Type: application/json99 **Example Response**:100 .. sourcecode:: http101 HTTP/1.1 200 OK102 [{103 "binary": "nova-compute",104 "id": 1105 }]106 """107 # mock for running nova disable service for a compute host108 if cfg.CONF.testing.use_mock:109 mock_json = "tools/compute-mock-data.json"110 json_file = os.path.join(111 os.path.dirname(os.path.dirname(__file__)), mock_json)112 with open(json_file) as f:113 return jsonify(json.load(f)['disable_compute_services'])114 compute_client = get_compute_client(request)115 compute_services = compute_client.services.list(host=hostname)116 if len(compute_services) == 0:117 msg = 'No compute service for %s' % hostname118 LOG.error(msg)119 abort(410, msg)120 failed = []121 disabled = []122 for service in compute_services:123 binary = getattr(service, 'binary', '')124 id = getattr(service, 'id')125 status = getattr(service, 'status', '')126 if status == 'enabled':127 try:128 compute_client.services.disable(hostname, binary)129 disabled.append({'id': id, 'binary': binary})130 except Exception as ex:131 failed.append({'id': id, 'binary': binary, 'error': str(ex)})132 LOG.error(133 'Failed to disable compute service for %s id = %s' +134 'binary = % s' % (hostname, id, binary))135 LOG.error(ex)136 else:137 # already disabled, will not call138 disabled.append({'id': id, 'binary': binary})139 if len(failed) > 0:140 return complete_with_errors_response(141 'Completed disabling compute services with errors',142 {'failed': failed, 'disabled': disabled})143 return jsonify(disabled)144@bp.route("/api/v2/compute/services/<hostname>/enable", methods=['PUT'])145@policy.enforce('lifecycle:update_compute')146def compute_enable_services(hostname):147 """Enable the compute services for a compute host148 .. :quickref: Compute; Enable the compute services149 **Example Request**:150 .. sourcecode:: http151 PUT /api/v2/compute/services/<hostname>/enable HTTP/1.1152 Content-Type: application/json153 **Example Response**:154 .. sourcecode:: http155 HTTP/1.1 200 OK156 [{157 "binary": "nova-compute",158 "id": 1159 }]160 """161 # mock for running nova disable service for a compute host162 if cfg.CONF.testing.use_mock:163 mock_json = "tools/compute-mock-data.json"164 json_file = os.path.join(165 os.path.dirname(os.path.dirname(__file__)), mock_json)166 with open(json_file) as f:167 return jsonify(json.load(f)['enable_compute_services'])168 compute_client = get_compute_client(request)169 compute_services = compute_client.services.list(host=hostname)170 if len(compute_services) == 0:171 msg = 'No compute service for %s' % hostname172 LOG.error(msg)173 abort(410, msg)174 failed = []175 enabled = []176 for service in compute_services:177 binary = getattr(service, 'binary', '')178 id = getattr(service, 'id')179 status = getattr(service, 'status', '')180 if status == 'disabled':181 try:182 compute_client.services.enable(hostname, binary)183 enabled.append({'id': id, 'binary': binary})184 except Exception as ex:185 failed.append({'id': id, 'binary': binary, 'error': str(ex)})186 LOG.error(187 'Failed to enable compute service for %s id = %s' +188 'binary = % s' % (hostname, id, binary))189 LOG.error(ex)190 else:191 # already enabled, will not call192 enabled.append({'id': id, 'binary': binary})193 if len(failed) > 0:194 return complete_with_errors_response(195 'Completed enabling compute services with errors',196 {'failed': failed, 'enabled': enabled})197 return jsonify(enabled)198@bp.route("/api/v2/compute/services/<hostname>", methods=['DELETE'])199@policy.enforce('lifecycle:update_compute')200def compute_delete_services(hostname):201 """Delete the compute services for a compute host202 .. :quickref: Compute; Delete the compute services203 **Example Request**:204 .. sourcecode:: http205 DELETE /api/v2/compute/services/<hostname> HTTP/1.1206 Content-Type: application/json207 **Example Response**:208 .. sourcecode:: http209 HTTP/1.1 200 OK210 [{211 "binary": "nova-compute"212 "id": 1213 }]214 """215 # mock for running nova delete service for a compute host216 if cfg.CONF.testing.use_mock:217 mock_json = "tools/compute-mock-data.json"218 json_file = os.path.join(219 os.path.dirname(os.path.dirname(__file__)), mock_json)220 with open(json_file) as f:221 return jsonify(json.load(f)['delete_compute_services'])222 compute_client = get_compute_client(request)223 compute_services = compute_client.services.list(host=hostname)224 if len(compute_services) == 0:225 msg = 'No compute service for %s' % hostname226 LOG.error(msg)227 abort(410, msg)228 failed = []229 deleted = []230 for service in compute_services:231 binary = getattr(service, 'binary', '')232 id = getattr(service, 'id')233 try:234 compute_client.services.delete(id)235 deleted.append({'id': id, 'binary': binary})236 except Exception as ex:237 failed.append({'id': id, 'binary': binary, 'error': str(ex)})238 LOG.error(239 'Failed to delete compute service for %s id = %s binary = %s'240 % (hostname, id, binary))241 LOG.error(ex)242 if len(failed) > 0:243 return complete_with_errors_response(244 'Completed deleting compute services with errors',245 {'failed': failed, 'deleted': deleted})246 return jsonify(deleted)247@bp.route("/api/v2/compute/aggregates/<hostname>", methods=['DELETE'])248@policy.enforce('lifecycle:update_compute')249def compute_delete_aggregates(hostname):250 """Delete the aggregates for a compute host251 .. :quickref: Compute; Delete aggregates for a compute host252 **Example Request**:253 .. sourcecode:: http254 DELETE /api/v2/compute/aggregates/<hostname> HTTP/1.1255 Content-Type: application/json256 **Example Response**:257 .. sourcecode:: http258 HTTP/1.1 200 OK259 [{260 "availability_zone": "test-az",261 "id": 3,262 "name": "agg_group3"263 }, {264 "availability_zone": null,265 "id": 1,266 "name": "agg_group1"267 }, {268 "availability_zone": null,269 "id": 2,270 "name": "agg_group2"271 }]272 """273 # mock for running nova delete aggregates for a compute host274 # mock contains partial failure275 if cfg.CONF.testing.use_mock:276 mock_json = "tools/compute-mock-data.json"277 json_file = os.path.join(278 os.path.dirname(os.path.dirname(__file__)), mock_json)279 with open(json_file) as f:280 return complete_with_errors_response(281 'Completed deleting aggregates with errors',282 json.load(f)['delete_aggregates'])283 compute_client = get_compute_client(request)284 # list all the aggregates285 compute_aggregates = compute_client.aggregates.list()286 if len(compute_aggregates) == 0:287 msg = 'No aggregates found for %s ' % hostname288 LOG.info(msg)289 abort(410, msg)290 # get details so we can decide which one we need to291 # remove compute host292 aggregates = []293 for aggr in compute_aggregates:294 details = compute_client.aggregates.get(aggr)295 id = getattr(aggr, 'id')296 name = getattr(aggr, 'name')297 az = getattr(aggr, 'availability_zone')298 if hostname in getattr(details, 'hosts', []):299 aggregates.append({300 'id': id, 'name': name, 'availability_zone': az})301 if len(aggregates) == 0:302 msg = 'No aggregates found for %s ' % hostname303 LOG.info(msg)304 abort(410, msg)305 failed = []306 deleted = []307 for aggr in aggregates:308 id = aggr['id']309 name = aggr['name']310 az = aggr['availability_zone']311 try:312 compute_client.aggregates.remove_host(id, hostname)313 deleted.append({'id': id, 'name': name, 'availability_zone': az})314 except Exception as ex:315 failed.append({316 'id': id, 'name': name,317 'availability_zone': az, 'error': str(ex)})318 LOG.error(319 'Failed to delete aggregate for %s id = %s name = %s '320 'availability_zone = %s' % (hostname, id, name, az))321 LOG.error(ex)322 if len(failed) > 0:323 return complete_with_errors_response(324 'Completed deleting aggregates with errors',325 {'failed': failed, 'deleted': deleted})326 return jsonify(deleted)327@bp.route(328 "/api/v2/compute/instances/<src_hostname>/<target_hostname>/migrate",329 methods=['PUT'])330@policy.enforce('lifecycle:update_compute')331def compute_migrate_instances(src_hostname, target_hostname):332 """Migrate instances of a compute host to another compute host333 .. :quickref: Compute; Live migrate instances of a compute host334 **Example Request**:335 .. sourcecode:: http336 PUT337 /api/v2/compute/instances/<src_hostname>/<target_hostname>/migrate338 HTTP/1.1339 Content-Type: application/json340 **Example Response**:341 .. sourcecode:: http342 HTTP/1.1 200 OK343 [{344 "id": "8279e65d-6e87-4a50-b789-96edd753fbb2",345 "name": "test3"346 }, {347 "id": "1d51f18f-27fd-4c34-a0aa-c07a5e9462e7",348 "name": "test2"349 }]350 """351 # mock for running nova instance live migrating for a compute host352 if cfg.CONF.testing.use_mock:353 mock_json = "tools/compute-mock-data.json"354 json_file = os.path.join(355 os.path.dirname(os.path.dirname(__file__)), mock_json)356 with open(json_file) as f:357 return jsonify(json.load(f)['migrate_instances'])358 compute_client = get_compute_client(request)359 search_opts = {360 'all_tenants': 1, # all tenants361 'host': src_hostname362 }363 instances = compute_client.servers.list(search_opts=search_opts)364 if len(instances) == 0:365 msg = 'No instances found for %s' % src_hostname366 LOG.info(msg)367 abort(410, msg)368 migrating = [] # list of migrating instance ids and names369 failed = [] # list of failed instance ids, names and errors370 for inst in instances:371 id = getattr(inst, 'id')372 name = getattr(inst, 'name')373 try:374 compute_client.servers.live_migrate(375 id, target_hostname, block_migration='auto')376 migrating.append({'id': id, 'name': name})377 except Exception as ex:378 failed.append({'id': id, 'name': name, 'error': str(ex)})379 LOG.error(380 'Failed to start migrating instance of %s id = %s name = %s' %381 (src_hostname, id, name))382 LOG.error(ex)383 if len(failed) > 0:384 return complete_with_errors_response(385 'Completed migrating instances with errors',386 {'failed': failed, 'migrating': migrating})387 return jsonify(migrating)388@bp.route("/api/v2/compute/instances/<hostname>", methods=['GET'])389@policy.enforce('lifecycle:get_compute')390def compute_get_instances(hostname):391 """Return instances of a compute host392 .. :quickref: Compute; Get instances of a compute host393 **Example Request**:394 .. sourcecode:: http395 GET /api/v2/compute/instances/<hostname> HTTP/1.1396 Content-Type: application/json397 **Example Response**:398 .. sourcecode:: http399 HTTP/1.1 200 OK400 [{401 "id": "ab3622db-648b-435d-b9af-f279e57bd8c9",402 "name": "test4",403 "status": "ACTIVE"404 }]405 """406 # mock for running nova instance list indicating whether all instances for407 # a compute host are migrated408 if cfg.CONF.testing.use_mock:409 mock_json = "tools/compute-mock-data.json"410 json_file = os.path.join(411 os.path.dirname(os.path.dirname(__file__)), mock_json)412 with open(json_file) as f:413 return jsonify(json.load(f)['get_instances'])414 compute_client = get_compute_client(request)415 try:416 search_opts = {417 'all_tenants': 1, # all tenants418 'host': hostname419 }420 instances = compute_client.servers.list(search_opts=search_opts)421 ret_instances = [422 {'id': getattr(inst, 'id'), 'name': getattr(inst, 'name'),423 'status': getattr(inst, 'status')} for inst in instances424 ]425 return jsonify(ret_instances)426 except Exception as e:427 msg = \428 'Failed to get instances for compute host %s' % hostname...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...4from google.oauth2 import service_account5from googleapiclient import errors6class GCPSDKObject():7 SERVICE_ACCOUNT_FILE = '/path/to/service-account-file.json'8 def get_compute_client(self):9 credentials = service_account.Credentials.from_service_account_file(10 self.SERVICE_ACCOUNT_FILE)11 service = discovery.build('compute', 'v1', credentials=credentials)12 return service13 def backend_buckets_list(self, project_name: str):14 """15 列出GCS后端16 """17 service = self.get_compute_client()18 result = list()19 request = service.backendBuckets().list(project=project_name)20 while request is not None:21 try:22 response = request.execute()23 except errors.HttpError as err:24 print(err._get_reason())25 raise err26 for backend_service in response['items']:27 pprint(backend_service)28 result.append(backend_service)29 request = service.backendServices().list_next(previous_request=request, previous_response=response)30 return result31 def backend_buckets_create(self, project_name: str, backend_bucket_name: str, gcs_bucket_name: str):32 """33 创建 backend GCS 后端34 :param project_name: project_id 或者 project_name35 :param backend_bucket_name: 新创建的 backend 名称36 :param gcs_bucket_name: GCP侧 cloud storage 中的bucket名称,一定要和实际的一致37 :return:38 """39 # api: https://cloud.google.com/compute/docs/reference/rest/v1/backendBuckets/insert40 service = self.get_compute_client()41 body = {42 "name": backend_bucket_name,43 "bucketName": gcs_bucket_name,44 "enableCdn": True45 }46 request = service.backendBuckets().insert(project=project_name, body=body)47 try:48 response = request.execute()49 pprint(response)50 return response51 except errors.HttpError as err:52 # Something went wrong, print out some information.53 print('There was an error creating the model. Check the details:')54 print(err._get_reason())55 raise err56 def backend_gcs_url_maps_list(self, project_name: str):57 service = self.get_compute_client()58 result = list()59 request = service.urlMaps().list(project=project_name)60 while request is not None:61 try:62 response = request.execute()63 except errors.HttpError as err:64 # Something went wrong, print out some information.65 print('There was an error creating the model. Check the details:')66 print(err._get_reason())67 raise err68 for url_map in response['items']:69 pprint(url_map)70 result.append(url_map)71 request = service.urlMaps().list_next(previous_request=request, previous_response=response)72 return result73 def backend_gcs_url_maps_create(self, project_name: str, url_map_name: str, backend_or_service_name: str,74 is_bucket=True):75 """76 配置GCS后端 网址映射77 :param project_name:78 :param url_map_name:79 :param backend_or_service_name:80 :param is_bucket: default=True81 :return:82 """83 # api: https://cloud.google.com/compute/docs/reference/rest/v1/urlMaps/insert84 service = self.get_compute_client()85 if is_bucket:86 service_url = f"compute/v1/projects/{project_name}/global/backendBuckets/{backend_or_service_name}"87 else:88 service_url = f"compute/v1/projects/{project_name}/global/backendServices/{backend_or_service_name}"89 body = {90 "name": url_map_name,91 "defaultService": service_url,92 }93 request = service.urlMaps().insert(project=project_name, body=body)94 try:95 response = request.execute()96 pprint(response)97 return response98 except errors.HttpError as err:99 print(err._get_reason())100 raise err101 def compute_global_addresses_list(self, project_name: str):102 service = self.get_compute_client()103 result = list()104 request = service.globalAddresses().list(project=project_name)105 while request is not None:106 try:107 response = request.execute()108 except errors.HttpError as err:109 print(err._get_reason())110 raise err111 for address in response['items']:112 pprint(address)113 result.append(address)114 request = service.globalAddresses().list_next(previous_request=request, previous_response=response)115 return result116 def compute_global_addresses_create(self, project_name: str, address_name: str, ip_type: str = "IPV4"):117 """118 创建全局静态外部 IP 地址119 :param project_name:120 :param address_name:121 :param ip_type:122 :return:123 """124 service = self.get_compute_client()125 body = {126 "name": address_name,127 "ipVersion": ip_type,128 "addressType": "EXTERNAL"129 }130 request = service.globalAddresses().insert(project=project_name, body=body)131 try:132 response = request.execute()133 pprint(response)134 return response135 except errors.HttpError as err:136 print(err._get_reason())137 raise err138 def target_http_proxies_create(self, project_name: str, proxy_name: str, url_map_name: str):139 """140 创建代理 proxy141 创建目标 HTTP 代理以将请求路由到您的网址映射142 """143 service = self.get_compute_client()144 body = {145 "name": proxy_name,146 "urlMap": url_map_name147 }148 request = service.targetHttpProxies().insert(project=project_name, body=body)149 try:150 response = request.execute()151 pprint(response)152 return response153 except errors.HttpError as err:154 print(err._get_reason())155 raise err156 def target_https_proxies_create(self, project_name: str, proxy_name: str, url_map_name: str, cert_name: str):157 """158 创建代理 proxy 使用ssl证书159 """160 service = self.get_compute_client()161 body = {162 "name": proxy_name,163 "urlMap": url_map_name,164 "sslCertificates": [cert_name]165 }166 request = service.targetHttpsProxies().insert(project=project_name, body=body)167 try:168 response = request.execute()169 pprint(response)170 return response171 except errors.HttpError as err:172 print(err._get_reason())173 raise err174 def ssl_certificates_create(self, cert_name: str, certificate: str, private_key: str, description: str = "",175 project_name: str = "spoton-project"):176 """177 创建证书178 gcloud compute ssl-certificates list179 """180 body = {"name": cert_name, "certificate": certificate, "privateKey": private_key, "description": description}181 compute = self.get_compute_client()182 request = compute.sslCertificates().insert(project=project_name, body=body)183 try:184 response = request.execute()185 return response186 except errors.HttpError as err:187 print(err._get_reason())188 raise err189 def global_forwarding_rules_create(self, project_name: str, forwarding_rule_name: str, address_name: str, port: int,190 proxy_name: str):191 """192 前端配置193 """194 service = self.get_compute_client()195 body = {196 "name": forwarding_rule_name,197 "IPAddress": address_name,198 "target": proxy_name,199 "ports": [200 str(port)201 ]202 }203 request = service.globalForwardingRules().insert(project=project_name, body=body)204 try:205 response = request.execute()206 return response207 except errors.HttpError as err:208 print(err._get_reason())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful